/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.functions.worker;

import com.google.common.annotations.VisibleForTesting;
import java.net.URI;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.stream.Collectors;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.UriBuilder;
import org.apache.commons.lang3.StringUtils;
import org.apache.distributedlog.api.namespace.Namespace;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.common.policies.data.ErrorData;
import org.apache.pulsar.functions.instance.AuthenticationConfig;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.InstanceCommunication;
import org.apache.pulsar.functions.runtime.KubernetesRuntimeFactory;
import org.apache.pulsar.functions.runtime.ProcessRuntimeFactory;
import org.apache.pulsar.functions.runtime.Runtime;
import org.apache.pulsar.functions.runtime.RuntimeFactory;
import org.apache.pulsar.functions.runtime.RuntimeSpawner;
import org.apache.pulsar.functions.runtime.ThreadRuntimeFactory;
import org.apache.pulsar.functions.worker.ConnectorsManager;
import org.apache.pulsar.functions.worker.FunctionAction;
import org.apache.pulsar.functions.worker.FunctionActioner;
import org.apache.pulsar.functions.worker.FunctionAssignmentTailer;
import org.apache.pulsar.functions.worker.FunctionRuntimeInfo;
import org.apache.pulsar.functions.worker.MembershipManager;
import org.apache.pulsar.functions.worker.Utils;
import org.apache.pulsar.functions.worker.WorkerConfig;
import org.apache.pulsar.functions.worker.WorkerInfo;
import org.apache.pulsar.functions.worker.WorkerService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FunctionRuntimeManager
implements AutoCloseable {
    private static final Logger log = LoggerFactory.getLogger(FunctionRuntimeManager.class);
    @VisibleForTesting
    Map<String, Map<String, Function.Assignment>> workerIdToAssignments = new ConcurrentHashMap<String, Map<String, Function.Assignment>>();
    @VisibleForTesting
    Map<String, FunctionRuntimeInfo> functionRuntimeInfoMap = new ConcurrentHashMap<String, FunctionRuntimeInfo>();
    @VisibleForTesting
    final WorkerConfig workerConfig;
    @VisibleForTesting
    LinkedBlockingQueue<FunctionAction> actionQueue;
    private FunctionAssignmentTailer functionAssignmentTailer;
    private FunctionActioner functionActioner;
    private RuntimeFactory runtimeFactory;
    private MembershipManager membershipManager;
    private final PulsarAdmin functionAdmin;
    private WorkerService workerService;
    boolean isInitializePhase = false;

    public FunctionRuntimeManager(WorkerConfig workerConfig, WorkerService workerService, Namespace dlogNamespace, MembershipManager membershipManager, ConnectorsManager connectorsManager) throws Exception {
        this.workerConfig = workerConfig;
        this.workerService = workerService;
        this.functionAdmin = workerService.getFunctionAdmin();
        AuthenticationConfig authConfig = AuthenticationConfig.builder().clientAuthenticationPlugin(workerConfig.getClientAuthenticationPlugin()).clientAuthenticationParameters(workerConfig.getClientAuthenticationParameters()).tlsTrustCertsFilePath(workerConfig.getTlsTrustCertsFilePath()).useTls(workerConfig.isUseTls()).tlsAllowInsecureConnection(workerConfig.isTlsAllowInsecureConnection()).tlsHostnameVerificationEnable(workerConfig.isTlsHostnameVerificationEnable()).build();
        if (workerConfig.getThreadContainerFactory() != null) {
            this.runtimeFactory = new ThreadRuntimeFactory(workerConfig.getThreadContainerFactory().getThreadGroupName(), workerConfig.getPulsarServiceUrl(), workerConfig.getStateStorageServiceUrl(), authConfig);
        } else if (workerConfig.getProcessContainerFactory() != null) {
            this.runtimeFactory = new ProcessRuntimeFactory(workerConfig.getPulsarServiceUrl(), workerConfig.getStateStorageServiceUrl(), authConfig, workerConfig.getProcessContainerFactory().getJavaInstanceJarLocation(), workerConfig.getProcessContainerFactory().getPythonInstanceLocation(), workerConfig.getProcessContainerFactory().getLogDirectory());
        } else if (workerConfig.getKubernetesContainerFactory() != null) {
            this.runtimeFactory = new KubernetesRuntimeFactory(workerConfig.getKubernetesContainerFactory().getK8Uri(), workerConfig.getKubernetesContainerFactory().getJobNamespace(), workerConfig.getKubernetesContainerFactory().getPulsarDockerImageName(), workerConfig.getKubernetesContainerFactory().getPulsarRootDir(), workerConfig.getKubernetesContainerFactory().getSubmittingInsidePod(), workerConfig.getKubernetesContainerFactory().getInstallUserCodeDependencies(), workerConfig.getKubernetesContainerFactory().getCustomLabels(), StringUtils.isEmpty((CharSequence)workerConfig.getKubernetesContainerFactory().getPulsarServiceUrl()) ? workerConfig.getPulsarServiceUrl() : workerConfig.getKubernetesContainerFactory().getPulsarServiceUrl(), StringUtils.isEmpty((CharSequence)workerConfig.getKubernetesContainerFactory().getPulsarAdminUrl()) ? workerConfig.getPulsarWebServiceUrl() : workerConfig.getKubernetesContainerFactory().getPulsarAdminUrl(), workerConfig.getStateStorageServiceUrl(), authConfig, Integer.valueOf(workerConfig.getKubernetesContainerFactory().getExpectedMetricsCollectionInterval() == null ? -1 : workerConfig.getKubernetesContainerFactory().getExpectedMetricsCollectionInterval()));
        } else {
            throw new RuntimeException("Either Thread, Process or Kubernetes Container Factory need to be set");
        }
        this.actionQueue = new LinkedBlockingQueue();
        this.functionActioner = new FunctionActioner(this.workerConfig, this.runtimeFactory, dlogNamespace, this.actionQueue, connectorsManager);
        this.membershipManager = membershipManager;
    }

    public void initialize() {
        log.info("/** Initializing Runtime Manager **/");
        try {
            Reader reader = this.getWorkerService().getClient().newReader().topic(this.getWorkerConfig().getFunctionAssignmentTopic()).readCompacted(true).startMessageId(MessageId.earliest).create();
            this.functionAssignmentTailer = new FunctionAssignmentTailer(this, (Reader<byte[]>)reader);
            this.setInitializePhase(true);
            while (reader.hasMessageAvailable()) {
                this.functionAssignmentTailer.processAssignment((Message<byte[]>)reader.readNext());
            }
            this.setInitializePhase(false);
            Map<String, Function.Assignment> assignmentMap = this.workerIdToAssignments.get(this.workerConfig.getWorkerId());
            if (assignmentMap != null) {
                for (Function.Assignment assignment : assignmentMap.values()) {
                    this.startFunctionInstance(assignment);
                }
            }
            this.functionAssignmentTailer.start();
        }
        catch (Exception e) {
            log.error("Failed to initialize function runtime manager: ", (Object)e.getMessage(), (Object)e);
            throw new RuntimeException(e);
        }
    }

    public void start() {
        log.info("/** Starting Function Runtime Manager **/");
        log.info("Initialize metrics sink...");
        log.info("Starting function actioner...");
        this.functionActioner.start();
        log.info("Starting function assignment tailer...");
        this.functionAssignmentTailer.start();
    }

    public synchronized Map<String, Map<String, Function.Assignment>> getCurrentAssignments() {
        HashMap<String, Map<String, Function.Assignment>> copy = new HashMap<String, Map<String, Function.Assignment>>();
        for (Map.Entry<String, Map<String, Function.Assignment>> entry : this.workerIdToAssignments.entrySet()) {
            HashMap<String, Function.Assignment> tmp = new HashMap<String, Function.Assignment>();
            tmp.putAll(entry.getValue());
            copy.put(entry.getKey(), tmp);
        }
        return copy;
    }

    public synchronized Function.Assignment findFunctionAssignment(String tenant, String namespace, String functionName, int instanceId) {
        return this.findAssignment(tenant, namespace, functionName, instanceId);
    }

    public synchronized Collection<Function.Assignment> findFunctionAssignments(String tenant, String namespace, String functionName) {
        return FunctionRuntimeManager.findFunctionAssignments(tenant, namespace, functionName, this.workerIdToAssignments);
    }

    public static Collection<Function.Assignment> findFunctionAssignments(String tenant, String namespace, String functionName, Map<String, Map<String, Function.Assignment>> workerIdToAssignments) {
        LinkedList<Function.Assignment> assignments = new LinkedList<Function.Assignment>();
        for (Map<String, Function.Assignment> entryMap : workerIdToAssignments.values()) {
            assignments.addAll(entryMap.values().stream().filter(assignment -> tenant.equals(assignment.getInstance().getFunctionMetaData().getFunctionDetails().getTenant()) && namespace.equals(assignment.getInstance().getFunctionMetaData().getFunctionDetails().getNamespace()) && functionName.equals(assignment.getInstance().getFunctionMetaData().getFunctionDetails().getName())).collect(Collectors.toList()));
        }
        return assignments;
    }

    public synchronized void removeAssignments(Collection<Function.Assignment> assignments) {
        for (Function.Assignment assignment : assignments) {
            this.deleteAssignment(assignment);
        }
    }

    public InstanceCommunication.FunctionStatus getFunctionInstanceStatus(String tenant, String namespace, String functionName, int instanceId, URI uri) {
        Function.Assignment assignment = this.runtimeFactory.externallyManaged() ? this.findAssignment(tenant, namespace, functionName, -1) : this.findAssignment(tenant, namespace, functionName, instanceId);
        String assignedWorkerId = assignment.getWorkerId();
        String workerId = this.workerConfig.getWorkerId();
        if (assignment == null) {
            InstanceCommunication.FunctionStatus.Builder functionStatusBuilder = InstanceCommunication.FunctionStatus.newBuilder();
            functionStatusBuilder.setRunning(false);
            functionStatusBuilder.setFailureException("Function has not been scheduled");
            return functionStatusBuilder.build();
        }
        InstanceCommunication.FunctionStatus functionStatus = null;
        if (assignedWorkerId.equals(workerId)) {
            FunctionRuntimeInfo functionRuntimeInfo = this.getFunctionRuntimeInfo(Utils.getFullyQualifiedInstanceId(assignment.getInstance()));
            RuntimeSpawner runtimeSpawner = functionRuntimeInfo.getRuntimeSpawner();
            if (runtimeSpawner != null) {
                try {
                    InstanceCommunication.FunctionStatus.Builder functionStatusBuilder = InstanceCommunication.FunctionStatus.newBuilder((InstanceCommunication.FunctionStatus)((InstanceCommunication.FunctionStatus)functionRuntimeInfo.getRuntimeSpawner().getFunctionStatus(instanceId).get()));
                    functionStatusBuilder.setWorkerId(assignedWorkerId);
                    functionStatus = functionStatusBuilder.build();
                }
                catch (InterruptedException | ExecutionException e) {
                    throw new RuntimeException(e);
                }
            } else {
                InstanceCommunication.FunctionStatus.Builder functionStatusBuilder = InstanceCommunication.FunctionStatus.newBuilder();
                functionStatusBuilder.setRunning(false);
                functionStatusBuilder.setInstanceId(String.valueOf(instanceId));
                if (functionRuntimeInfo.getStartupException() != null) {
                    functionStatusBuilder.setFailureException(functionRuntimeInfo.getStartupException().getMessage());
                }
                functionStatusBuilder.setWorkerId(assignedWorkerId);
                functionStatus = functionStatusBuilder.build();
            }
        } else {
            List<WorkerInfo> workerInfoList = this.membershipManager.getCurrentMembership();
            WorkerInfo workerInfo = null;
            for (WorkerInfo entry : workerInfoList) {
                if (!assignment.getWorkerId().equals(entry.getWorkerId())) continue;
                workerInfo = entry;
            }
            if (workerInfo == null) {
                InstanceCommunication.FunctionStatus.Builder functionStatusBuilder = InstanceCommunication.FunctionStatus.newBuilder();
                functionStatusBuilder.setRunning(false);
                functionStatusBuilder.setInstanceId(String.valueOf(instanceId));
                functionStatusBuilder.setFailureException("Function has not been scheduled");
                return functionStatusBuilder.build();
            }
            if (uri == null) {
                throw new WebApplicationException(Response.serverError().status(Response.Status.INTERNAL_SERVER_ERROR).build());
            }
            URI redirect = UriBuilder.fromUri((URI)uri).host(workerInfo.getWorkerHostname()).port(workerInfo.getPort()).build(new Object[0]);
            throw new WebApplicationException(Response.temporaryRedirect((URI)redirect).build());
        }
        return functionStatus;
    }

    public Response stopFunctionInstance(String tenant, String namespace, String functionName, int instanceId, boolean restart, URI uri) throws Exception {
        String workerId;
        if (this.runtimeFactory.externallyManaged()) {
            return Response.status((Response.Status)Response.Status.NOT_IMPLEMENTED).type("application/json").entity((Object)new ErrorData("Externally managed schedulers can't do per instance stop")).build();
        }
        Function.Assignment assignment = this.findAssignment(tenant, namespace, functionName, instanceId);
        String fullFunctionName = String.format("%s/%s/%s/%s", tenant, namespace, functionName, instanceId);
        if (assignment == null) {
            return Response.status((Response.Status)Response.Status.BAD_REQUEST).type("application/json").entity((Object)new ErrorData(fullFunctionName + " doesn't exist")).build();
        }
        String assignedWorkerId = assignment.getWorkerId();
        if (assignedWorkerId.equals(workerId = this.workerConfig.getWorkerId())) {
            this.stopFunction(Utils.getFullyQualifiedInstanceId(assignment.getInstance()), restart);
            return Response.status((Response.Status)Response.Status.OK).build();
        }
        List<WorkerInfo> workerInfoList = this.membershipManager.getCurrentMembership();
        WorkerInfo workerInfo = null;
        for (WorkerInfo entry : workerInfoList) {
            if (!assignment.getWorkerId().equals(entry.getWorkerId())) continue;
            workerInfo = entry;
        }
        if (workerInfo == null) {
            return Response.status((Response.Status)Response.Status.BAD_REQUEST).type("application/json").entity((Object)new ErrorData(fullFunctionName + " has not been assigned yet")).build();
        }
        if (uri == null) {
            throw new WebApplicationException(Response.serverError().status(Response.Status.INTERNAL_SERVER_ERROR).build());
        }
        URI redirect = UriBuilder.fromUri((URI)uri).host(workerInfo.getWorkerHostname()).port(workerInfo.getPort()).build(new Object[0]);
        throw new WebApplicationException(Response.temporaryRedirect((URI)redirect).build());
    }

    public Response stopFunctionInstances(String tenant, String namespace, String functionName, boolean restart) throws Exception {
        String fullFunctionName = String.format("%s/%s/%s", tenant, namespace, functionName);
        Collection<Function.Assignment> assignments = this.findFunctionAssignments(tenant, namespace, functionName);
        if (assignments.isEmpty()) {
            return Response.status((Response.Status)Response.Status.BAD_REQUEST).type("application/json").entity((Object)new ErrorData(fullFunctionName + " has not been assigned yet")).build();
        }
        if (this.runtimeFactory.externallyManaged()) {
            Function.Assignment assignment = assignments.iterator().next();
            String assignedWorkerId = assignment.getWorkerId();
            String workerId = this.workerConfig.getWorkerId();
            String fullyQualifiedInstanceId = Utils.getFullyQualifiedInstanceId(assignment.getInstance());
            if (assignedWorkerId.equals(workerId)) {
                this.stopFunction(fullyQualifiedInstanceId, restart);
            } else {
                List<WorkerInfo> workerInfoList = this.membershipManager.getCurrentMembership();
                WorkerInfo workerInfo = null;
                for (WorkerInfo entry : workerInfoList) {
                    if (!assignment.getWorkerId().equals(entry.getWorkerId())) continue;
                    workerInfo = entry;
                }
                if (workerInfo == null) {
                    if (log.isDebugEnabled()) {
                        log.debug("[{}] has not been assigned yet", (Object)fullyQualifiedInstanceId);
                    }
                    return Response.status((Response.Status)Response.Status.BAD_REQUEST).type("application/json").entity((Object)new ErrorData(fullFunctionName + " has not been assigned yet")).build();
                }
                if (restart) {
                    this.functionAdmin.functions().restartFunction(tenant, namespace, functionName);
                } else {
                    this.functionAdmin.functions().stopFunction(tenant, namespace, functionName);
                }
            }
        } else {
            for (Function.Assignment assignment : assignments) {
                String assignedWorkerId = assignment.getWorkerId();
                String workerId = this.workerConfig.getWorkerId();
                String fullyQualifiedInstanceId = Utils.getFullyQualifiedInstanceId(assignment.getInstance());
                if (assignedWorkerId.equals(workerId)) {
                    this.stopFunction(fullyQualifiedInstanceId, restart);
                    continue;
                }
                List<WorkerInfo> workerInfoList = this.membershipManager.getCurrentMembership();
                WorkerInfo workerInfo = null;
                for (WorkerInfo entry : workerInfoList) {
                    if (!assignment.getWorkerId().equals(entry.getWorkerId())) continue;
                    workerInfo = entry;
                }
                if (workerInfo == null) {
                    if (!log.isDebugEnabled()) continue;
                    log.debug("[{}] has not been assigned yet", (Object)fullyQualifiedInstanceId);
                    continue;
                }
                if (restart) {
                    this.functionAdmin.functions().restartFunction(tenant, namespace, functionName, assignment.getInstance().getInstanceId());
                    continue;
                }
                this.functionAdmin.functions().stopFunction(tenant, namespace, functionName, assignment.getInstance().getInstanceId());
            }
        }
        return Response.status((Response.Status)Response.Status.OK).build();
    }

    public void stopAllOwnedFunctions() {
        if (this.runtimeFactory.externallyManaged()) {
            log.warn("Will not stop any functions since they are externally managed");
            return;
        }
        String workerId = this.workerConfig.getWorkerId();
        Map<String, Function.Assignment> assignments = this.workerIdToAssignments.get(workerId);
        if (assignments != null) {
            assignments.values().forEach(assignment -> {
                String fullyQualifiedInstanceId = Utils.getFullyQualifiedInstanceId(assignment.getInstance());
                try {
                    this.stopFunction(fullyQualifiedInstanceId, false);
                }
                catch (Exception e) {
                    log.warn("Failed to stop function {} - {}", (Object)fullyQualifiedInstanceId, (Object)e.getMessage());
                }
            });
        }
    }

    private void stopFunction(String fullyQualifiedInstanceId, boolean restart) throws Exception {
        log.info("[{}] {}..", (Object)(restart ? "restarting" : "stopping"), (Object)fullyQualifiedInstanceId);
        FunctionRuntimeInfo functionRuntimeInfo = this.getFunctionRuntimeInfo(fullyQualifiedInstanceId);
        if (functionRuntimeInfo != null) {
            this.functionActioner.stopFunction(functionRuntimeInfo);
            try {
                if (restart) {
                    this.functionActioner.startFunction(functionRuntimeInfo);
                }
            }
            catch (Exception ex) {
                log.info("{} Error re-starting function", (Object)fullyQualifiedInstanceId, (Object)ex);
                functionRuntimeInfo.setStartupException(ex);
                throw ex;
            }
        }
    }

    /*
     * Enabled aggressive block sorting
     */
    public InstanceCommunication.FunctionStatusList getAllFunctionStatus(String tenant, String namespace, String functionName, URI uri) throws PulsarAdminException {
        Collection<Function.Assignment> assignments = this.findFunctionAssignments(tenant, namespace, functionName);
        InstanceCommunication.FunctionStatusList.Builder functionStatusListBuilder = InstanceCommunication.FunctionStatusList.newBuilder();
        if (assignments.isEmpty()) {
            return functionStatusListBuilder.build();
        }
        if (this.runtimeFactory.externallyManaged()) {
            Function.Assignment assignment = assignments.iterator().next();
            boolean isOwner = this.workerConfig.getWorkerId().equals(assignment.getWorkerId());
            if (isOwner) {
                int parallelism = assignment.getInstance().getFunctionMetaData().getFunctionDetails().getParallelism();
                for (int i = 0; i < parallelism; ++i) {
                    InstanceCommunication.FunctionStatus functionStatus = this.getFunctionInstanceStatus(tenant, namespace, functionName, i, null);
                    functionStatusListBuilder.addFunctionStatusList(functionStatus);
                }
                return functionStatusListBuilder.build();
            } else {
                List<WorkerInfo> workerInfoList = this.membershipManager.getCurrentMembership();
                WorkerInfo workerInfo = null;
                for (WorkerInfo entry : workerInfoList) {
                    if (!assignment.getWorkerId().equals(entry.getWorkerId())) continue;
                    workerInfo = entry;
                }
                if (workerInfo == null) {
                    InstanceCommunication.FunctionStatusList.Builder functionStatusBuilder = InstanceCommunication.FunctionStatusList.newBuilder();
                    functionStatusBuilder.setError("Function not yet scheduled");
                    return functionStatusBuilder.build();
                }
                if (uri == null) {
                    throw new WebApplicationException(Response.serverError().status(Response.Status.INTERNAL_SERVER_ERROR).build());
                }
                URI redirect = UriBuilder.fromUri((URI)uri).host(workerInfo.getWorkerHostname()).port(workerInfo.getPort()).build(new Object[0]);
                throw new WebApplicationException(Response.temporaryRedirect((URI)redirect).build());
            }
        }
        for (Function.Assignment assignment : assignments) {
            boolean isOwner = this.workerConfig.getWorkerId().equals(assignment.getWorkerId());
            InstanceCommunication.FunctionStatus functionStatus = isOwner ? this.getFunctionInstanceStatus(tenant, namespace, functionName, assignment.getInstance().getInstanceId(), null) : this.functionAdmin.functions().getFunctionStatus(assignment.getInstance().getFunctionMetaData().getFunctionDetails().getTenant(), assignment.getInstance().getFunctionMetaData().getFunctionDetails().getNamespace(), assignment.getInstance().getFunctionMetaData().getFunctionDetails().getName(), assignment.getInstance().getInstanceId());
            functionStatusListBuilder.addFunctionStatusList(functionStatus);
        }
        return functionStatusListBuilder.build();
    }

    public synchronized void processAssignment(Function.Assignment newAssignment) {
        HashMap<String, Function.Assignment> existingAssignmentMap = new HashMap<String, Function.Assignment>();
        for (Map<String, Function.Assignment> entry : this.workerIdToAssignments.values()) {
            existingAssignmentMap.putAll(entry);
        }
        if (existingAssignmentMap.containsKey(Utils.getFullyQualifiedInstanceId(newAssignment.getInstance()))) {
            this.updateAssignment(newAssignment);
        } else {
            this.addAssignment(newAssignment);
        }
    }

    private void updateAssignment(Function.Assignment assignment) {
        String fullyQualifiedInstanceId = Utils.getFullyQualifiedInstanceId(assignment.getInstance());
        Function.Assignment existingAssignment = this.findAssignment(assignment);
        if (!existingAssignment.equals((Object)assignment)) {
            Function.Assignment existing_assignment;
            FunctionRuntimeInfo functionRuntimeInfo = this.functionRuntimeInfoMap.get(fullyQualifiedInstanceId);
            if (functionRuntimeInfo != null) {
                this.insertStopAction(functionRuntimeInfo);
            }
            if (assignment.getWorkerId().equals(this.workerConfig.getWorkerId())) {
                FunctionRuntimeInfo newFunctionRuntimeInfo = new FunctionRuntimeInfo();
                newFunctionRuntimeInfo.setFunctionInstance(assignment.getInstance());
                this.insertStartAction(newFunctionRuntimeInfo);
                this.setFunctionRuntimeInfo(fullyQualifiedInstanceId, newFunctionRuntimeInfo);
            }
            if ((existing_assignment = this.findAssignment(assignment)) != null) {
                this.deleteAssignment(existing_assignment);
            }
            this.setAssignment(assignment);
        }
    }

    public synchronized void deleteAssignment(String fullyQualifiedInstanceId) {
        Map<String, Function.Assignment> worker;
        FunctionRuntimeInfo functionRuntimeInfo = this.functionRuntimeInfoMap.get(fullyQualifiedInstanceId);
        if (functionRuntimeInfo != null) {
            this.insertStopAction(functionRuntimeInfo);
            this.deleteFunctionRuntimeInfo(fullyQualifiedInstanceId);
        }
        String workerId = null;
        for (Map.Entry<String, Map<String, Function.Assignment>> workerAssignments : this.workerIdToAssignments.entrySet()) {
            if (workerAssignments.getValue().remove(fullyQualifiedInstanceId) == null) continue;
            workerId = workerAssignments.getKey();
            break;
        }
        if (workerId != null && (worker = this.workerIdToAssignments.get(workerId)) != null && worker.isEmpty()) {
            this.workerIdToAssignments.remove(workerId);
        }
    }

    @VisibleForTesting
    void deleteAssignment(Function.Assignment assignment) {
        String fullyQualifiedInstanceId = Utils.getFullyQualifiedInstanceId(assignment.getInstance());
        Map<String, Function.Assignment> assignmentMap = this.workerIdToAssignments.get(assignment.getWorkerId());
        if (assignmentMap != null) {
            if (assignmentMap.containsKey(fullyQualifiedInstanceId)) {
                assignmentMap.remove(fullyQualifiedInstanceId);
            }
            if (assignmentMap.isEmpty()) {
                this.workerIdToAssignments.remove(assignment.getWorkerId());
            }
        }
    }

    private void addAssignment(Function.Assignment assignment) {
        this.setAssignment(assignment);
        if (assignment.getWorkerId().equals(this.workerConfig.getWorkerId())) {
            this.startFunctionInstance(assignment);
        }
    }

    private void startFunctionInstance(Function.Assignment assignment) {
        String fullyQualifiedInstanceId = Utils.getFullyQualifiedInstanceId(assignment.getInstance());
        if (!this.functionRuntimeInfoMap.containsKey(fullyQualifiedInstanceId)) {
            this.setFunctionRuntimeInfo(fullyQualifiedInstanceId, new FunctionRuntimeInfo().setFunctionInstance(assignment.getInstance()));
        } else {
            log.warn("Function {} already running. Going to restart function.", (Object)this.functionRuntimeInfoMap.get(fullyQualifiedInstanceId));
            this.insertStopAction(this.functionRuntimeInfoMap.get(fullyQualifiedInstanceId));
        }
        FunctionRuntimeInfo functionRuntimeInfo = this.functionRuntimeInfoMap.get(fullyQualifiedInstanceId);
        this.insertStartAction(functionRuntimeInfo);
    }

    public Map<String, FunctionRuntimeInfo> getFunctionRuntimeInfos() {
        return this.functionRuntimeInfoMap;
    }

    public void updateRates() {
        if (this.runtimeFactory.externallyManaged()) {
            return;
        }
        for (Map.Entry<String, FunctionRuntimeInfo> entry : this.functionRuntimeInfoMap.entrySet()) {
            Runtime functionRuntime;
            RuntimeSpawner functionRuntimeSpawner = entry.getValue().getRuntimeSpawner();
            if (functionRuntimeSpawner == null || (functionRuntime = functionRuntimeSpawner.getRuntime()) == null) continue;
            try {
                functionRuntime.resetMetrics().get();
            }
            catch (Exception e) {
                log.error("Failed to update stats for {}-{}", (Object)entry.getKey(), (Object)e.getMessage());
            }
        }
    }

    @VisibleForTesting
    void insertStopAction(FunctionRuntimeInfo functionRuntimeInfo) {
        if (!this.isInitializePhase) {
            FunctionAction functionAction = new FunctionAction();
            functionAction.setAction(FunctionAction.Action.STOP);
            functionAction.setFunctionRuntimeInfo(functionRuntimeInfo);
            try {
                this.actionQueue.put(functionAction);
            }
            catch (InterruptedException ex) {
                throw new RuntimeException("Interrupted while putting action");
            }
        }
    }

    @VisibleForTesting
    void insertStartAction(FunctionRuntimeInfo functionRuntimeInfo) {
        if (!this.isInitializePhase) {
            FunctionAction functionAction = new FunctionAction();
            functionAction.setAction(FunctionAction.Action.START);
            functionAction.setFunctionRuntimeInfo(functionRuntimeInfo);
            try {
                this.actionQueue.put(functionAction);
            }
            catch (InterruptedException ex) {
                throw new RuntimeException("Interrupted while putting action");
            }
        }
    }

    private Function.Assignment findAssignment(String tenant, String namespace, String functionName, int instanceId) {
        String fullyQualifiedInstanceId = Utils.getFullyQualifiedInstanceId(tenant, namespace, functionName, instanceId);
        for (Map.Entry<String, Map<String, Function.Assignment>> entry : this.workerIdToAssignments.entrySet()) {
            Map<String, Function.Assignment> assignmentMap = entry.getValue();
            Function.Assignment existingAssignment = assignmentMap.get(fullyQualifiedInstanceId);
            if (existingAssignment == null) continue;
            return existingAssignment;
        }
        return null;
    }

    private Function.Assignment findAssignment(Function.Assignment assignment) {
        return this.findAssignment(assignment.getInstance().getFunctionMetaData().getFunctionDetails().getTenant(), assignment.getInstance().getFunctionMetaData().getFunctionDetails().getNamespace(), assignment.getInstance().getFunctionMetaData().getFunctionDetails().getName(), assignment.getInstance().getInstanceId());
    }

    @VisibleForTesting
    void setAssignment(Function.Assignment assignment) {
        if (!this.workerIdToAssignments.containsKey(assignment.getWorkerId())) {
            this.workerIdToAssignments.put(assignment.getWorkerId(), new HashMap());
        }
        this.workerIdToAssignments.get(assignment.getWorkerId()).put(Utils.getFullyQualifiedInstanceId(assignment.getInstance()), assignment);
    }

    private void deleteFunctionRuntimeInfo(String fullyQualifiedInstanceId) {
        if (!this.isInitializePhase) {
            this.functionRuntimeInfoMap.remove(fullyQualifiedInstanceId);
        }
    }

    private void setFunctionRuntimeInfo(String fullyQualifiedInstanceId, FunctionRuntimeInfo functionRuntimeInfo) {
        if (!this.isInitializePhase) {
            this.functionRuntimeInfoMap.put(fullyQualifiedInstanceId, functionRuntimeInfo);
        }
    }

    @Override
    public void close() throws Exception {
        this.stopAllOwnedFunctions();
        this.functionActioner.close();
        this.functionAssignmentTailer.close();
        if (this.runtimeFactory != null) {
            this.runtimeFactory.close();
        }
    }

    private FunctionRuntimeInfo getFunctionRuntimeInfo(String fullyQualifiedInstanceId) {
        return this.functionRuntimeInfoMap.get(fullyQualifiedInstanceId);
    }

    public WorkerConfig getWorkerConfig() {
        return this.workerConfig;
    }

    public RuntimeFactory getRuntimeFactory() {
        return this.runtimeFactory;
    }

    public WorkerService getWorkerService() {
        return this.workerService;
    }

    public void setInitializePhase(boolean isInitializePhase) {
        this.isInitializePhase = isInitializePhase;
    }

    public boolean isInitializePhase() {
        return this.isInitializePhase;
    }
}

