/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.functions.worker;

import com.google.common.annotations.VisibleForTesting;
import java.net.URI;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.UriBuilder;
import org.apache.commons.lang3.StringUtils;
import org.apache.distributedlog.api.namespace.Namespace;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.common.functions.WorkerInfo;
import org.apache.pulsar.common.policies.data.ErrorData;
import org.apache.pulsar.common.policies.data.FunctionStats;
import org.apache.pulsar.functions.instance.AuthenticationConfig;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.runtime.KubernetesRuntimeFactory;
import org.apache.pulsar.functions.runtime.ProcessRuntimeFactory;
import org.apache.pulsar.functions.runtime.RuntimeFactory;
import org.apache.pulsar.functions.runtime.RuntimeSpawner;
import org.apache.pulsar.functions.runtime.ThreadRuntimeFactory;
import org.apache.pulsar.functions.secretsprovider.ClearTextSecretsProvider;
import org.apache.pulsar.functions.secretsprovider.SecretsProvider;
import org.apache.pulsar.functions.secretsproviderconfigurator.DefaultSecretsProviderConfigurator;
import org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderConfigurator;
import org.apache.pulsar.functions.utils.FunctionCommon;
import org.apache.pulsar.functions.utils.FunctionInstanceId;
import org.apache.pulsar.functions.utils.Reflections;
import org.apache.pulsar.functions.worker.ConnectorsManager;
import org.apache.pulsar.functions.worker.FunctionActioner;
import org.apache.pulsar.functions.worker.FunctionAssignmentTailer;
import org.apache.pulsar.functions.worker.FunctionMetaDataManager;
import org.apache.pulsar.functions.worker.FunctionRuntimeInfo;
import org.apache.pulsar.functions.worker.MembershipManager;
import org.apache.pulsar.functions.worker.WorkerConfig;
import org.apache.pulsar.functions.worker.WorkerService;
import org.apache.pulsar.functions.worker.WorkerUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FunctionRuntimeManager
implements AutoCloseable {
    private static final Logger log = LoggerFactory.getLogger(FunctionRuntimeManager.class);
    @VisibleForTesting
    Map<String, Map<String, Function.Assignment>> workerIdToAssignments = new ConcurrentHashMap<String, Map<String, Function.Assignment>>();
    @VisibleForTesting
    Map<String, FunctionRuntimeInfo> functionRuntimeInfoMap = new ConcurrentHashMap<String, FunctionRuntimeInfo>();
    @VisibleForTesting
    final WorkerConfig workerConfig;
    private FunctionAssignmentTailer functionAssignmentTailer;
    private FunctionActioner functionActioner;
    private RuntimeFactory runtimeFactory;
    private MembershipManager membershipManager;
    private final PulsarAdmin functionAdmin;
    private WorkerService workerService;
    boolean isInitializePhase = false;
    private final FunctionMetaDataManager functionMetaDataManager;

    public FunctionRuntimeManager(WorkerConfig workerConfig, WorkerService workerService, Namespace dlogNamespace, MembershipManager membershipManager, ConnectorsManager connectorsManager, FunctionMetaDataManager functionMetaDataManager) throws Exception {
        this.workerConfig = workerConfig;
        this.workerService = workerService;
        this.functionAdmin = workerService.getFunctionAdmin();
        Object secretsProviderConfigurator = !StringUtils.isEmpty((CharSequence)workerConfig.getSecretsProviderConfiguratorClassName()) ? (SecretsProviderConfigurator)Reflections.createInstance((String)workerConfig.getSecretsProviderConfiguratorClassName(), (ClassLoader)ClassLoader.getSystemClassLoader()) : new DefaultSecretsProviderConfigurator();
        secretsProviderConfigurator.init(workerConfig.getSecretsProviderConfiguratorConfig());
        AuthenticationConfig authConfig = null;
        if (workerConfig.isAuthenticationEnabled()) {
            authConfig = AuthenticationConfig.builder().clientAuthenticationPlugin(workerConfig.getClientAuthenticationPlugin()).clientAuthenticationParameters(workerConfig.getClientAuthenticationParameters()).tlsTrustCertsFilePath(workerConfig.getTlsTrustCertsFilePath()).useTls(workerConfig.isUseTls()).tlsAllowInsecureConnection(workerConfig.isTlsAllowInsecureConnection()).tlsHostnameVerificationEnable(workerConfig.isTlsHostnameVerificationEnable()).build();
        }
        if (workerConfig.getThreadContainerFactory() != null) {
            this.runtimeFactory = new ThreadRuntimeFactory(workerConfig.getThreadContainerFactory().getThreadGroupName(), workerConfig.getPulsarServiceUrl(), workerConfig.getStateStorageServiceUrl(), authConfig, (SecretsProvider)new ClearTextSecretsProvider(), null, null);
        } else if (workerConfig.getProcessContainerFactory() != null) {
            this.runtimeFactory = new ProcessRuntimeFactory(workerConfig.getPulsarServiceUrl(), workerConfig.getStateStorageServiceUrl(), authConfig, workerConfig.getProcessContainerFactory().getJavaInstanceJarLocation(), workerConfig.getProcessContainerFactory().getPythonInstanceLocation(), workerConfig.getProcessContainerFactory().getLogDirectory(), workerConfig.getProcessContainerFactory().getExtraFunctionDependenciesDir(), (SecretsProviderConfigurator)secretsProviderConfigurator, workerConfig.isAuthenticationEnabled());
        } else if (workerConfig.getKubernetesContainerFactory() != null) {
            this.runtimeFactory = new KubernetesRuntimeFactory(workerConfig.getKubernetesContainerFactory().getK8Uri(), workerConfig.getKubernetesContainerFactory().getJobNamespace(), workerConfig.getKubernetesContainerFactory().getPulsarDockerImageName(), workerConfig.getKubernetesContainerFactory().getImagePullPolicy(), workerConfig.getKubernetesContainerFactory().getPulsarRootDir(), workerConfig.getKubernetesContainerFactory().getSubmittingInsidePod(), workerConfig.getKubernetesContainerFactory().getInstallUserCodeDependencies(), workerConfig.getKubernetesContainerFactory().getPythonDependencyRepository(), workerConfig.getKubernetesContainerFactory().getPythonExtraDependencyRepository(), workerConfig.getKubernetesContainerFactory().getExtraFunctionDependenciesDir(), workerConfig.getKubernetesContainerFactory().getCustomLabels(), workerConfig.getKubernetesContainerFactory().getPercentMemoryPadding(), StringUtils.isEmpty((CharSequence)workerConfig.getKubernetesContainerFactory().getPulsarServiceUrl()) ? workerConfig.getPulsarServiceUrl() : workerConfig.getKubernetesContainerFactory().getPulsarServiceUrl(), StringUtils.isEmpty((CharSequence)workerConfig.getKubernetesContainerFactory().getPulsarAdminUrl()) ? workerConfig.getPulsarWebServiceUrl() : workerConfig.getKubernetesContainerFactory().getPulsarAdminUrl(), workerConfig.getStateStorageServiceUrl(), authConfig, Integer.valueOf(workerConfig.getKubernetesContainerFactory().getExpectedMetricsCollectionInterval() == null ? -1 : workerConfig.getKubernetesContainerFactory().getExpectedMetricsCollectionInterval()), workerConfig.getKubernetesContainerFactory().getChangeConfigMap(), workerConfig.getKubernetesContainerFactory().getChangeConfigMapNamespace(), workerConfig.getFunctionInstanceMinResources(), (SecretsProviderConfigurator)secretsProviderConfigurator, workerConfig.isAuthenticationEnabled());
        } else {
            throw new RuntimeException("Either Thread, Process or Kubernetes Container Factory need to be set");
        }
        this.functionActioner = new FunctionActioner(this.workerConfig, this.runtimeFactory, dlogNamespace, connectorsManager, workerService.getBrokerAdmin());
        this.membershipManager = membershipManager;
        this.functionMetaDataManager = functionMetaDataManager;
    }

    public void initialize() {
        log.info("/** Initializing Runtime Manager **/");
        try {
            Reader reader = this.getWorkerService().getClient().newReader().topic(this.getWorkerConfig().getFunctionAssignmentTopic()).readCompacted(true).startMessageId(MessageId.earliest).create();
            this.functionAssignmentTailer = new FunctionAssignmentTailer(this, (Reader<byte[]>)reader);
            this.setInitializePhase(true);
            while (reader.hasMessageAvailable()) {
                this.functionAssignmentTailer.processAssignment((Message<byte[]>)reader.readNext());
            }
            this.setInitializePhase(false);
            Map<String, Function.Assignment> assignmentMap = this.workerIdToAssignments.get(this.workerConfig.getWorkerId());
            if (assignmentMap != null) {
                for (Function.Assignment assignment : assignmentMap.values()) {
                    if (!this.needsStart(assignment)) continue;
                    this.startFunctionInstance(assignment);
                }
            }
        }
        catch (Exception e) {
            log.error("Failed to initialize function runtime manager: ", (Object)e.getMessage(), (Object)e);
            throw new RuntimeException(e);
        }
    }

    public void start() {
        log.info("/** Starting Function Runtime Manager **/");
        log.info("Starting function assignment tailer...");
        this.functionAssignmentTailer.start();
    }

    public synchronized Map<String, Map<String, Function.Assignment>> getCurrentAssignments() {
        HashMap<String, Map<String, Function.Assignment>> copy = new HashMap<String, Map<String, Function.Assignment>>();
        for (Map.Entry<String, Map<String, Function.Assignment>> entry : this.workerIdToAssignments.entrySet()) {
            HashMap<String, Function.Assignment> tmp = new HashMap<String, Function.Assignment>();
            tmp.putAll(entry.getValue());
            copy.put(entry.getKey(), tmp);
        }
        return copy;
    }

    public synchronized Function.Assignment findFunctionAssignment(String tenant, String namespace, String functionName, int instanceId) {
        return this.findAssignment(tenant, namespace, functionName, instanceId);
    }

    public synchronized Collection<Function.Assignment> findFunctionAssignments(String tenant, String namespace, String functionName) {
        return FunctionRuntimeManager.findFunctionAssignments(tenant, namespace, functionName, this.workerIdToAssignments);
    }

    public static Collection<Function.Assignment> findFunctionAssignments(String tenant, String namespace, String functionName, Map<String, Map<String, Function.Assignment>> workerIdToAssignments) {
        LinkedList<Function.Assignment> assignments = new LinkedList<Function.Assignment>();
        for (Map<String, Function.Assignment> entryMap : workerIdToAssignments.values()) {
            assignments.addAll(entryMap.values().stream().filter(assignment -> tenant.equals(assignment.getInstance().getFunctionMetaData().getFunctionDetails().getTenant()) && namespace.equals(assignment.getInstance().getFunctionMetaData().getFunctionDetails().getNamespace()) && functionName.equals(assignment.getInstance().getFunctionMetaData().getFunctionDetails().getName())).collect(Collectors.toList()));
        }
        return assignments;
    }

    public synchronized void removeAssignments(Collection<Function.Assignment> assignments) {
        for (Function.Assignment assignment : assignments) {
            this.deleteAssignment(assignment);
        }
    }

    public void restartFunctionInstance(String tenant, String namespace, String functionName, int instanceId, URI uri) throws Exception {
        String workerId;
        if (this.runtimeFactory.externallyManaged()) {
            throw new WebApplicationException(Response.serverError().status(Response.Status.NOT_IMPLEMENTED).type("application/json").entity((Object)new ErrorData("Externally managed schedulers can't do per instance stop")).build());
        }
        Function.Assignment assignment = this.findAssignment(tenant, namespace, functionName, instanceId);
        String fullFunctionName = String.format("%s/%s/%s/%s", tenant, namespace, functionName, instanceId);
        if (assignment == null) {
            throw new WebApplicationException(Response.serverError().status(Response.Status.BAD_REQUEST).type("application/json").entity((Object)new ErrorData(fullFunctionName + " doesn't exist")).build());
        }
        String assignedWorkerId = assignment.getWorkerId();
        if (assignedWorkerId.equals(workerId = this.workerConfig.getWorkerId())) {
            this.stopFunction(FunctionCommon.getFullyQualifiedInstanceId((Function.Instance)assignment.getInstance()), true);
            return;
        }
        List<WorkerInfo> workerInfoList = this.membershipManager.getCurrentMembership();
        WorkerInfo workerInfo = null;
        for (WorkerInfo entry : workerInfoList) {
            if (!assignment.getWorkerId().equals(entry.getWorkerId())) continue;
            workerInfo = entry;
        }
        if (workerInfo == null) {
            throw new WebApplicationException(Response.serverError().status(Response.Status.BAD_REQUEST).type("application/json").entity((Object)new ErrorData(fullFunctionName + " has not been assigned yet")).build());
        }
        if (uri == null) {
            throw new WebApplicationException(Response.serverError().status(Response.Status.INTERNAL_SERVER_ERROR).build());
        }
        URI redirect = UriBuilder.fromUri((URI)uri).host(workerInfo.getWorkerHostname()).port(workerInfo.getPort()).build(new Object[0]);
        throw new WebApplicationException(Response.temporaryRedirect((URI)redirect).build());
    }

    public void restartFunctionInstances(String tenant, String namespace, String functionName) throws Exception {
        String fullFunctionName = String.format("%s/%s/%s", tenant, namespace, functionName);
        Collection<Function.Assignment> assignments = this.findFunctionAssignments(tenant, namespace, functionName);
        if (assignments.isEmpty()) {
            throw new WebApplicationException(Response.serverError().status(Response.Status.BAD_REQUEST).type("application/json").entity((Object)new ErrorData(fullFunctionName + " has not been assigned yet")).build());
        }
        if (this.runtimeFactory.externallyManaged()) {
            Function.Assignment assignment = assignments.iterator().next();
            String assignedWorkerId = assignment.getWorkerId();
            String workerId = this.workerConfig.getWorkerId();
            String fullyQualifiedInstanceId = FunctionCommon.getFullyQualifiedInstanceId((Function.Instance)assignment.getInstance());
            if (assignedWorkerId.equals(workerId)) {
                this.stopFunction(fullyQualifiedInstanceId, true);
            } else {
                List<WorkerInfo> workerInfoList = this.membershipManager.getCurrentMembership();
                WorkerInfo workerInfo = null;
                for (WorkerInfo entry : workerInfoList) {
                    if (!assignment.getWorkerId().equals(entry.getWorkerId())) continue;
                    workerInfo = entry;
                }
                if (workerInfo == null) {
                    if (log.isDebugEnabled()) {
                        log.debug("[{}] has not been assigned yet", (Object)fullyQualifiedInstanceId);
                    }
                    throw new WebApplicationException(Response.serverError().status(Response.Status.BAD_REQUEST).type("application/json").entity((Object)new ErrorData(fullFunctionName + " has not been assigned yet")).build());
                }
                this.functionAdmin.functions().restartFunction(tenant, namespace, functionName);
            }
        } else {
            for (Function.Assignment assignment : assignments) {
                String assignedWorkerId = assignment.getWorkerId();
                String workerId = this.workerConfig.getWorkerId();
                String fullyQualifiedInstanceId = FunctionCommon.getFullyQualifiedInstanceId((Function.Instance)assignment.getInstance());
                if (assignedWorkerId.equals(workerId)) {
                    this.stopFunction(fullyQualifiedInstanceId, true);
                    continue;
                }
                List<WorkerInfo> workerInfoList = this.membershipManager.getCurrentMembership();
                WorkerInfo workerInfo = null;
                for (WorkerInfo entry : workerInfoList) {
                    if (!assignment.getWorkerId().equals(entry.getWorkerId())) continue;
                    workerInfo = entry;
                }
                if (workerInfo == null) {
                    if (!log.isDebugEnabled()) continue;
                    log.debug("[{}] has not been assigned yet", (Object)fullyQualifiedInstanceId);
                    continue;
                }
                this.functionAdmin.functions().restartFunction(tenant, namespace, functionName, assignment.getInstance().getInstanceId());
            }
        }
    }

    public void stopAllOwnedFunctions() {
        if (this.runtimeFactory.externallyManaged()) {
            log.warn("Will not stop any functions since they are externally managed");
            return;
        }
        String workerId = this.workerConfig.getWorkerId();
        Map<String, Function.Assignment> assignments = this.workerIdToAssignments.get(workerId);
        if (assignments != null) {
            TreeMap<String, Function.Assignment> copiedAssignments = new TreeMap<String, Function.Assignment>(assignments);
            copiedAssignments.values().forEach(assignment -> {
                String fullyQualifiedInstanceId = FunctionCommon.getFullyQualifiedInstanceId((Function.Instance)assignment.getInstance());
                try {
                    this.stopFunction(fullyQualifiedInstanceId, false);
                }
                catch (Exception e) {
                    log.warn("Failed to stop function {} - {}", (Object)fullyQualifiedInstanceId, (Object)e.getMessage());
                }
            });
        }
    }

    private void stopFunction(String fullyQualifiedInstanceId, boolean restart) throws Exception {
        log.info("[{}] {}..", (Object)(restart ? "restarting" : "stopping"), (Object)fullyQualifiedInstanceId);
        FunctionRuntimeInfo functionRuntimeInfo = this.getFunctionRuntimeInfo(fullyQualifiedInstanceId);
        if (functionRuntimeInfo != null) {
            this.conditionallyStopFunction(functionRuntimeInfo);
            try {
                if (restart) {
                    this.conditionallyStartFunction(functionRuntimeInfo);
                }
            }
            catch (Exception ex) {
                log.info("{} Error re-starting function", (Object)fullyQualifiedInstanceId, (Object)ex);
                functionRuntimeInfo.setStartupException(ex);
                throw ex;
            }
        }
    }

    public FunctionStats.FunctionInstanceStats.FunctionInstanceStatsData getFunctionInstanceStats(String tenant, String namespace, String functionName, int instanceId, URI uri) {
        String workerId;
        Function.Assignment assignment = this.runtimeFactory.externallyManaged() ? this.findAssignment(tenant, namespace, functionName, -1) : this.findAssignment(tenant, namespace, functionName, instanceId);
        if (assignment == null) {
            return new FunctionStats.FunctionInstanceStats.FunctionInstanceStatsData();
        }
        String assignedWorkerId = assignment.getWorkerId();
        if (assignedWorkerId.equals(workerId = this.workerConfig.getWorkerId())) {
            FunctionRuntimeInfo functionRuntimeInfo = this.getFunctionRuntimeInfo(FunctionCommon.getFullyQualifiedInstanceId((Function.Instance)assignment.getInstance()));
            RuntimeSpawner runtimeSpawner = functionRuntimeInfo.getRuntimeSpawner();
            if (runtimeSpawner != null) {
                return WorkerUtils.getFunctionInstanceStats(FunctionCommon.getFullyQualifiedInstanceId((Function.Instance)assignment.getInstance()), functionRuntimeInfo, instanceId).getMetrics();
            }
            return new FunctionStats.FunctionInstanceStats.FunctionInstanceStatsData();
        }
        List<WorkerInfo> workerInfoList = this.membershipManager.getCurrentMembership();
        WorkerInfo workerInfo = null;
        for (WorkerInfo entry : workerInfoList) {
            if (!assignment.getWorkerId().equals(entry.getWorkerId())) continue;
            workerInfo = entry;
        }
        if (workerInfo == null) {
            return new FunctionStats.FunctionInstanceStats.FunctionInstanceStatsData();
        }
        if (uri == null) {
            throw new WebApplicationException(Response.serverError().status(Response.Status.INTERNAL_SERVER_ERROR).build());
        }
        URI redirect = UriBuilder.fromUri((URI)uri).host(workerInfo.getWorkerHostname()).port(workerInfo.getPort()).build(new Object[0]);
        throw new WebApplicationException(Response.temporaryRedirect((URI)redirect).build());
    }

    /*
     * Enabled aggressive block sorting
     */
    public FunctionStats getFunctionStats(String tenant, String namespace, String functionName, URI uri) throws PulsarAdminException {
        Collection<Function.Assignment> assignments = this.findFunctionAssignments(tenant, namespace, functionName);
        FunctionStats functionStats = new FunctionStats();
        if (assignments.isEmpty()) {
            return functionStats;
        }
        if (this.runtimeFactory.externallyManaged()) {
            Function.Assignment assignment = assignments.iterator().next();
            boolean isOwner = this.workerConfig.getWorkerId().equals(assignment.getWorkerId());
            if (isOwner) {
                int parallelism = assignment.getInstance().getFunctionMetaData().getFunctionDetails().getParallelism();
                for (int i = 0; i < parallelism; ++i) {
                    FunctionStats.FunctionInstanceStats.FunctionInstanceStatsData functionInstanceStatsData = this.getFunctionInstanceStats(tenant, namespace, functionName, i, null);
                    FunctionStats.FunctionInstanceStats functionInstanceStats = new FunctionStats.FunctionInstanceStats();
                    functionInstanceStats.setInstanceId(i);
                    functionInstanceStats.setMetrics(functionInstanceStatsData);
                    functionStats.addInstance(functionInstanceStats);
                }
                return functionStats.calculateOverall();
            } else {
                List<WorkerInfo> workerInfoList = this.membershipManager.getCurrentMembership();
                WorkerInfo workerInfo = null;
                for (WorkerInfo entry : workerInfoList) {
                    if (!assignment.getWorkerId().equals(entry.getWorkerId())) continue;
                    workerInfo = entry;
                }
                if (workerInfo == null) {
                    return functionStats;
                }
                if (uri == null) {
                    throw new WebApplicationException(Response.serverError().status(Response.Status.INTERNAL_SERVER_ERROR).build());
                }
                URI redirect = UriBuilder.fromUri((URI)uri).host(workerInfo.getWorkerHostname()).port(workerInfo.getPort()).build(new Object[0]);
                throw new WebApplicationException(Response.temporaryRedirect((URI)redirect).build());
            }
        }
        for (Function.Assignment assignment : assignments) {
            boolean isOwner = this.workerConfig.getWorkerId().equals(assignment.getWorkerId());
            FunctionStats.FunctionInstanceStats.FunctionInstanceStatsData functionInstanceStatsData = isOwner ? this.getFunctionInstanceStats(tenant, namespace, functionName, assignment.getInstance().getInstanceId(), null) : this.functionAdmin.functions().getFunctionStats(assignment.getInstance().getFunctionMetaData().getFunctionDetails().getTenant(), assignment.getInstance().getFunctionMetaData().getFunctionDetails().getNamespace(), assignment.getInstance().getFunctionMetaData().getFunctionDetails().getName(), assignment.getInstance().getInstanceId());
            FunctionStats.FunctionInstanceStats functionInstanceStats = new FunctionStats.FunctionInstanceStats();
            functionInstanceStats.setInstanceId(assignment.getInstance().getInstanceId());
            functionInstanceStats.setMetrics(functionInstanceStatsData);
            functionStats.addInstance(functionInstanceStats);
        }
        return functionStats.calculateOverall();
    }

    public synchronized void processAssignment(Function.Assignment newAssignment) {
        HashMap<String, Function.Assignment> existingAssignmentMap = new HashMap<String, Function.Assignment>();
        for (Map<String, Function.Assignment> entry : this.workerIdToAssignments.values()) {
            existingAssignmentMap.putAll(entry);
        }
        if (existingAssignmentMap.containsKey(FunctionCommon.getFullyQualifiedInstanceId((Function.Instance)newAssignment.getInstance()))) {
            this.updateAssignment(newAssignment);
        } else {
            this.addAssignment(newAssignment);
        }
    }

    private void updateAssignment(Function.Assignment assignment) {
        String fullyQualifiedInstanceId = FunctionCommon.getFullyQualifiedInstanceId((Function.Instance)assignment.getInstance());
        Function.Assignment existingAssignment = this.findAssignment(assignment);
        if (!existingAssignment.equals((Object)assignment)) {
            Function.Assignment existing_assignment;
            FunctionRuntimeInfo newFunctionRuntimeInfo;
            FunctionRuntimeInfo functionRuntimeInfo = this._getFunctionRuntimeInfo(fullyQualifiedInstanceId);
            if (this.runtimeFactory.externallyManaged()) {
                if (!assignment.getInstance().equals((Object)existingAssignment.getInstance())) {
                    if (functionRuntimeInfo != null) {
                        this.conditionallyStopFunction(functionRuntimeInfo);
                    }
                    if (assignment.getWorkerId().equals(this.workerConfig.getWorkerId())) {
                        if (this.needsStart(assignment)) {
                            newFunctionRuntimeInfo = new FunctionRuntimeInfo();
                            newFunctionRuntimeInfo.setFunctionInstance(assignment.getInstance());
                            this.conditionallyStartFunction(newFunctionRuntimeInfo);
                            this.functionRuntimeInfoMap.put(fullyQualifiedInstanceId, newFunctionRuntimeInfo);
                        }
                    } else {
                        this.functionRuntimeInfoMap.remove(fullyQualifiedInstanceId);
                    }
                } else if (assignment.getWorkerId().equals(this.workerConfig.getWorkerId())) {
                    newFunctionRuntimeInfo = new FunctionRuntimeInfo();
                    newFunctionRuntimeInfo.setFunctionInstance(assignment.getInstance());
                    RuntimeSpawner runtimeSpawner = this.functionActioner.getRuntimeSpawner(assignment.getInstance(), assignment.getInstance().getFunctionMetaData().getPackageLocation().getPackagePath());
                    runtimeSpawner.getRuntime().reinitialize();
                    newFunctionRuntimeInfo.setRuntimeSpawner(runtimeSpawner);
                    this.functionRuntimeInfoMap.put(fullyQualifiedInstanceId, newFunctionRuntimeInfo);
                } else {
                    this.functionRuntimeInfoMap.remove(fullyQualifiedInstanceId);
                }
            } else {
                if (functionRuntimeInfo != null) {
                    this.conditionallyStopFunction(functionRuntimeInfo);
                }
                if (assignment.getWorkerId().equals(this.workerConfig.getWorkerId())) {
                    if (this.needsStart(assignment)) {
                        newFunctionRuntimeInfo = new FunctionRuntimeInfo();
                        newFunctionRuntimeInfo.setFunctionInstance(assignment.getInstance());
                        this.conditionallyStartFunction(newFunctionRuntimeInfo);
                        this.functionRuntimeInfoMap.put(fullyQualifiedInstanceId, newFunctionRuntimeInfo);
                    }
                } else {
                    this.functionRuntimeInfoMap.remove(fullyQualifiedInstanceId);
                }
            }
            if ((existing_assignment = this.findAssignment(assignment)) != null) {
                this.deleteAssignment(existing_assignment);
            }
            this.setAssignment(assignment);
        }
    }

    public synchronized void deleteAssignment(String fullyQualifiedInstanceId) {
        Map<String, Function.Assignment> worker;
        FunctionRuntimeInfo functionRuntimeInfo = this._getFunctionRuntimeInfo(fullyQualifiedInstanceId);
        if (functionRuntimeInfo != null) {
            Function.FunctionDetails functionDetails = functionRuntimeInfo.getFunctionInstance().getFunctionMetaData().getFunctionDetails();
            if (this.functionMetaDataManager.containsFunction(functionDetails.getTenant(), functionDetails.getNamespace(), functionDetails.getName())) {
                this.conditionallyStopFunction(functionRuntimeInfo);
            } else {
                FunctionInstanceId functionInstanceId = new FunctionInstanceId(fullyQualifiedInstanceId);
                String name = functionInstanceId.getName();
                String namespace = functionInstanceId.getNamespace();
                String tenant = functionInstanceId.getTenant();
                Collection<Function.Assignment> assignments = FunctionRuntimeManager.findFunctionAssignments(tenant, namespace, name, this.workerIdToAssignments);
                if (assignments.size() > 1) {
                    this.conditionallyStopFunction(functionRuntimeInfo);
                } else {
                    this.conditionallyTerminateFunction(functionRuntimeInfo);
                }
            }
            this.functionRuntimeInfoMap.remove(fullyQualifiedInstanceId);
        }
        String workerId = null;
        for (Map.Entry<String, Map<String, Function.Assignment>> workerAssignments : this.workerIdToAssignments.entrySet()) {
            if (workerAssignments.getValue().remove(fullyQualifiedInstanceId) == null) continue;
            workerId = workerAssignments.getKey();
            break;
        }
        if (workerId != null && (worker = this.workerIdToAssignments.get(workerId)) != null && worker.isEmpty()) {
            this.workerIdToAssignments.remove(workerId);
        }
    }

    @VisibleForTesting
    void deleteAssignment(Function.Assignment assignment) {
        String fullyQualifiedInstanceId = FunctionCommon.getFullyQualifiedInstanceId((Function.Instance)assignment.getInstance());
        Map<String, Function.Assignment> assignmentMap = this.workerIdToAssignments.get(assignment.getWorkerId());
        if (assignmentMap != null) {
            if (assignmentMap.containsKey(fullyQualifiedInstanceId)) {
                assignmentMap.remove(fullyQualifiedInstanceId);
            }
            if (assignmentMap.isEmpty()) {
                this.workerIdToAssignments.remove(assignment.getWorkerId());
            }
        }
    }

    private void addAssignment(Function.Assignment assignment) {
        this.setAssignment(assignment);
        if (assignment.getWorkerId().equals(this.workerConfig.getWorkerId()) && this.needsStart(assignment)) {
            this.startFunctionInstance(assignment);
        }
    }

    private void startFunctionInstance(Function.Assignment assignment) {
        String fullyQualifiedInstanceId = FunctionCommon.getFullyQualifiedInstanceId((Function.Instance)assignment.getInstance());
        FunctionRuntimeInfo functionRuntimeInfo = this._getFunctionRuntimeInfo(fullyQualifiedInstanceId);
        if (functionRuntimeInfo == null) {
            functionRuntimeInfo = new FunctionRuntimeInfo().setFunctionInstance(assignment.getInstance());
            this.functionRuntimeInfoMap.put(fullyQualifiedInstanceId, functionRuntimeInfo);
        } else {
            log.warn("Function {} already running. Going to restart function.", (Object)functionRuntimeInfo);
            this.conditionallyStopFunction(functionRuntimeInfo);
        }
        this.conditionallyStartFunction(functionRuntimeInfo);
    }

    public Map<String, FunctionRuntimeInfo> getFunctionRuntimeInfos() {
        return this.functionRuntimeInfoMap;
    }

    private Function.Assignment findAssignment(String tenant, String namespace, String functionName, int instanceId) {
        String fullyQualifiedInstanceId = FunctionCommon.getFullyQualifiedInstanceId((String)tenant, (String)namespace, (String)functionName, (int)instanceId);
        for (Map.Entry<String, Map<String, Function.Assignment>> entry : this.workerIdToAssignments.entrySet()) {
            Map<String, Function.Assignment> assignmentMap = entry.getValue();
            Function.Assignment existingAssignment = assignmentMap.get(fullyQualifiedInstanceId);
            if (existingAssignment == null) continue;
            return existingAssignment;
        }
        return null;
    }

    private Function.Assignment findAssignment(Function.Assignment assignment) {
        return this.findAssignment(assignment.getInstance().getFunctionMetaData().getFunctionDetails().getTenant(), assignment.getInstance().getFunctionMetaData().getFunctionDetails().getNamespace(), assignment.getInstance().getFunctionMetaData().getFunctionDetails().getName(), assignment.getInstance().getInstanceId());
    }

    @VisibleForTesting
    void setAssignment(Function.Assignment assignment) {
        if (!this.workerIdToAssignments.containsKey(assignment.getWorkerId())) {
            this.workerIdToAssignments.put(assignment.getWorkerId(), new HashMap());
        }
        this.workerIdToAssignments.get(assignment.getWorkerId()).put(FunctionCommon.getFullyQualifiedInstanceId((Function.Instance)assignment.getInstance()), assignment);
    }

    @Override
    public void close() throws Exception {
        this.functionAssignmentTailer.close();
        this.stopAllOwnedFunctions();
        if (this.runtimeFactory != null) {
            this.runtimeFactory.close();
        }
    }

    public synchronized FunctionRuntimeInfo getFunctionRuntimeInfo(String fullyQualifiedInstanceId) {
        return this._getFunctionRuntimeInfo(fullyQualifiedInstanceId);
    }

    private FunctionRuntimeInfo _getFunctionRuntimeInfo(String fullyQualifiedInstanceId) {
        FunctionRuntimeInfo functionRuntimeInfo = this.functionRuntimeInfoMap.get(fullyQualifiedInstanceId);
        if (functionRuntimeInfo == null && this.workerIdToAssignments.containsValue(this.workerConfig.getWorkerId()) && this.workerIdToAssignments.get(this.workerConfig.getWorkerId()).containsValue(fullyQualifiedInstanceId)) {
            log.error("Assignments and RuntimeInfos are inconsistent. FunctionRuntimeInfo missing for " + fullyQualifiedInstanceId);
        }
        return functionRuntimeInfo;
    }

    private boolean needsStart(Function.Assignment assignment) {
        boolean toStart = false;
        Function.FunctionMetaData functionMetaData = assignment.getInstance().getFunctionMetaData();
        if (functionMetaData.getInstanceStatesMap() == null || functionMetaData.getInstanceStatesMap().isEmpty()) {
            toStart = true;
        } else if (assignment.getInstance().getInstanceId() < 0) {
            for (Function.FunctionState state : functionMetaData.getInstanceStatesMap().values()) {
                if (state != Function.FunctionState.RUNNING) continue;
                toStart = true;
            }
        } else if (functionMetaData.getInstanceStatesOrDefault(assignment.getInstance().getInstanceId(), Function.FunctionState.RUNNING) == Function.FunctionState.RUNNING) {
            toStart = true;
        }
        return toStart;
    }

    private void conditionallyStartFunction(FunctionRuntimeInfo functionRuntimeInfo) {
        if (!this.isInitializePhase) {
            this.functionActioner.startFunction(functionRuntimeInfo);
        }
    }

    private void conditionallyStopFunction(FunctionRuntimeInfo functionRuntimeInfo) {
        if (!this.isInitializePhase) {
            this.functionActioner.stopFunction(functionRuntimeInfo);
        }
    }

    private void conditionallyTerminateFunction(FunctionRuntimeInfo functionRuntimeInfo) {
        if (!this.isInitializePhase) {
            this.functionActioner.terminateFunction(functionRuntimeInfo);
        }
    }

    public WorkerConfig getWorkerConfig() {
        return this.workerConfig;
    }

    public void setFunctionActioner(FunctionActioner functionActioner) {
        this.functionActioner = functionActioner;
    }

    public FunctionActioner getFunctionActioner() {
        return this.functionActioner;
    }

    public RuntimeFactory getRuntimeFactory() {
        return this.runtimeFactory;
    }

    public WorkerService getWorkerService() {
        return this.workerService;
    }

    public void setInitializePhase(boolean isInitializePhase) {
        this.isInitializePhase = isInitializePhase;
    }

    public boolean isInitializePhase() {
        return this.isInitializePhase;
    }
}

