/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.functions.worker;

import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.AbstractMessage;
import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.stream.Collectors;
import javax.ws.rs.client.Client;
import javax.ws.rs.client.ClientBuilder;
import org.apache.distributedlog.api.namespace.Namespace;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.functions.instance.AuthenticationConfig;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.InstanceCommunication;
import org.apache.pulsar.functions.proto.Request;
import org.apache.pulsar.functions.runtime.ProcessRuntimeFactory;
import org.apache.pulsar.functions.runtime.Runtime;
import org.apache.pulsar.functions.runtime.RuntimeFactory;
import org.apache.pulsar.functions.runtime.RuntimeSpawner;
import org.apache.pulsar.functions.runtime.ThreadRuntimeFactory;
import org.apache.pulsar.functions.worker.ConnectorsManager;
import org.apache.pulsar.functions.worker.FunctionAction;
import org.apache.pulsar.functions.worker.FunctionActioner;
import org.apache.pulsar.functions.worker.FunctionAssignmentTailer;
import org.apache.pulsar.functions.worker.FunctionRuntimeInfo;
import org.apache.pulsar.functions.worker.MembershipManager;
import org.apache.pulsar.functions.worker.Utils;
import org.apache.pulsar.functions.worker.WorkerConfig;
import org.apache.pulsar.functions.worker.WorkerInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FunctionRuntimeManager
implements AutoCloseable {
    private static final Logger log = LoggerFactory.getLogger(FunctionRuntimeManager.class);
    @VisibleForTesting
    Map<String, Map<String, Function.Assignment>> workerIdToAssignments = new ConcurrentHashMap<String, Map<String, Function.Assignment>>();
    @VisibleForTesting
    Map<String, FunctionRuntimeInfo> functionRuntimeInfoMap = new ConcurrentHashMap<String, FunctionRuntimeInfo>();
    @VisibleForTesting
    final WorkerConfig workerConfig;
    @VisibleForTesting
    LinkedBlockingQueue<FunctionAction> actionQueue;
    private long currentAssignmentVersion = 0L;
    private final FunctionAssignmentTailer functionAssignmentTailer;
    private FunctionActioner functionActioner;
    private RuntimeFactory runtimeFactory;
    private MembershipManager membershipManager;
    private final ConnectorsManager connectorsManager;

    public FunctionRuntimeManager(WorkerConfig workerConfig, PulsarClient pulsarClient, Namespace dlogNamespace, MembershipManager membershipManager, ConnectorsManager connectorsManager) throws Exception {
        this.workerConfig = workerConfig;
        this.connectorsManager = connectorsManager;
        Reader reader = pulsarClient.newReader().topic(this.workerConfig.getFunctionAssignmentTopic()).startMessageId(MessageId.earliest).create();
        this.functionAssignmentTailer = new FunctionAssignmentTailer(this, (Reader<byte[]>)reader);
        AuthenticationConfig authConfig = AuthenticationConfig.builder().clientAuthenticationPlugin(workerConfig.getClientAuthenticationPlugin()).clientAuthenticationParameters(workerConfig.getClientAuthenticationParameters()).tlsTrustCertsFilePath(workerConfig.getTlsTrustCertsFilePath()).useTls(workerConfig.isUseTls()).tlsAllowInsecureConnection(workerConfig.isTlsAllowInsecureConnection()).tlsHostnameVerificationEnable(workerConfig.isTlsHostnameVerificationEnable()).build();
        if (workerConfig.getThreadContainerFactory() != null) {
            this.runtimeFactory = new ThreadRuntimeFactory(workerConfig.getThreadContainerFactory().getThreadGroupName(), workerConfig.getPulsarServiceUrl(), workerConfig.getStateStorageServiceUrl(), authConfig);
        } else if (workerConfig.getProcessContainerFactory() != null) {
            this.runtimeFactory = new ProcessRuntimeFactory(workerConfig.getPulsarServiceUrl(), workerConfig.getStateStorageServiceUrl(), authConfig, workerConfig.getProcessContainerFactory().getJavaInstanceJarLocation(), workerConfig.getProcessContainerFactory().getPythonInstanceLocation(), workerConfig.getProcessContainerFactory().getLogDirectory());
        } else {
            throw new RuntimeException("Either Thread or Process Container Factory need to be set");
        }
        this.actionQueue = new LinkedBlockingQueue();
        this.functionActioner = new FunctionActioner(this.workerConfig, this.runtimeFactory, dlogNamespace, this.actionQueue, connectorsManager);
        this.membershipManager = membershipManager;
    }

    public void start() {
        log.info("/** Starting Function Runtime Manager **/");
        log.info("Initialize metrics sink...");
        log.info("Starting function actioner...");
        this.functionActioner.start();
        log.info("Starting function assignment tailer...");
        this.functionAssignmentTailer.start();
    }

    public synchronized Map<String, Map<String, Function.Assignment>> getCurrentAssignments() {
        HashMap<String, Map<String, Function.Assignment>> copy = new HashMap<String, Map<String, Function.Assignment>>();
        for (Map.Entry<String, Map<String, Function.Assignment>> entry : this.workerIdToAssignments.entrySet()) {
            HashMap<String, Function.Assignment> tmp = new HashMap<String, Function.Assignment>();
            tmp.putAll(entry.getValue());
            copy.put(entry.getKey(), tmp);
        }
        return copy;
    }

    public synchronized Function.Assignment findFunctionAssignment(String tenant, String namespace, String functionName, int instanceId) {
        return this.findAssignment(tenant, namespace, functionName, instanceId);
    }

    public synchronized Collection<Function.Assignment> findFunctionAssignments(String tenant, String namespace, String functionName) {
        return FunctionRuntimeManager.findFunctionAssignments(tenant, namespace, functionName, this.workerIdToAssignments);
    }

    public static Collection<Function.Assignment> findFunctionAssignments(String tenant, String namespace, String functionName, Map<String, Map<String, Function.Assignment>> workerIdToAssignments) {
        LinkedList<Function.Assignment> assignments = new LinkedList<Function.Assignment>();
        for (Map<String, Function.Assignment> entryMap : workerIdToAssignments.values()) {
            assignments.addAll(entryMap.values().stream().filter(assignment -> tenant.equals(assignment.getInstance().getFunctionMetaData().getFunctionDetails().getTenant()) && namespace.equals(assignment.getInstance().getFunctionMetaData().getFunctionDetails().getNamespace()) && functionName.equals(assignment.getInstance().getFunctionMetaData().getFunctionDetails().getName())).collect(Collectors.toList()));
        }
        return assignments;
    }

    public synchronized long getCurrentAssignmentVersion() {
        return new Long(this.currentAssignmentVersion);
    }

    public synchronized void removeAssignments(Collection<Function.Assignment> assignments) {
        for (Function.Assignment assignment : assignments) {
            this.deleteAssignment(assignment);
        }
    }

    public InstanceCommunication.FunctionStatus getFunctionInstanceStatus(String tenant, String namespace, String functionName, int instanceId) {
        Function.Assignment assignment = this.findAssignment(tenant, namespace, functionName, instanceId);
        String assignedWorkerId = assignment.getWorkerId();
        String workerId = this.workerConfig.getWorkerId();
        if (assignment == null) {
            InstanceCommunication.FunctionStatus.Builder functionStatusBuilder = InstanceCommunication.FunctionStatus.newBuilder();
            functionStatusBuilder.setRunning(false);
            functionStatusBuilder.setFailureException("Function has not been scheduled");
            return functionStatusBuilder.build();
        }
        InstanceCommunication.FunctionStatus functionStatus = null;
        if (assignedWorkerId.equals(workerId)) {
            FunctionRuntimeInfo functionRuntimeInfo = this.getFunctionRuntimeInfo(Utils.getFullyQualifiedInstanceId(assignment.getInstance()));
            RuntimeSpawner runtimeSpawner = functionRuntimeInfo.getRuntimeSpawner();
            if (runtimeSpawner != null) {
                try {
                    InstanceCommunication.FunctionStatus.Builder functionStatusBuilder = InstanceCommunication.FunctionStatus.newBuilder((InstanceCommunication.FunctionStatus)((InstanceCommunication.FunctionStatus)functionRuntimeInfo.getRuntimeSpawner().getFunctionStatus().get()));
                    functionStatusBuilder.setWorkerId(assignedWorkerId);
                    functionStatus = functionStatusBuilder.build();
                }
                catch (InterruptedException | ExecutionException e) {
                    throw new RuntimeException(e);
                }
            } else {
                InstanceCommunication.FunctionStatus.Builder functionStatusBuilder = InstanceCommunication.FunctionStatus.newBuilder();
                functionStatusBuilder.setRunning(false);
                functionStatusBuilder.setInstanceId(String.valueOf(instanceId));
                if (functionRuntimeInfo.getStartupException() != null) {
                    functionStatusBuilder.setFailureException(functionRuntimeInfo.getStartupException().getMessage());
                }
                functionStatusBuilder.setWorkerId(assignedWorkerId);
                functionStatus = functionStatusBuilder.build();
            }
        } else {
            List<WorkerInfo> workerInfoList = this.membershipManager.getCurrentMembership();
            WorkerInfo workerInfo = null;
            for (WorkerInfo entry : workerInfoList) {
                if (!assignment.getWorkerId().equals(entry.getWorkerId())) continue;
                workerInfo = entry;
            }
            if (workerInfo == null) {
                InstanceCommunication.FunctionStatus.Builder functionStatusBuilder = InstanceCommunication.FunctionStatus.newBuilder();
                functionStatusBuilder.setRunning(false);
                functionStatusBuilder.setInstanceId(String.valueOf(instanceId));
                functionStatusBuilder.setFailureException("Function has not been scheduled");
                return functionStatusBuilder.build();
            }
            Client client = ClientBuilder.newClient();
            String jsonResponse = (String)client.target(String.format("http://%s:%d/admin/functions/%s/%s/%s/%d/status", workerInfo.getWorkerHostname(), workerInfo.getPort(), tenant, namespace, functionName, instanceId)).request(new String[]{"text/plain"}).get(String.class);
            InstanceCommunication.FunctionStatus.Builder functionStatusBuilder = InstanceCommunication.FunctionStatus.newBuilder();
            try {
                org.apache.pulsar.functions.utils.Utils.mergeJson((String)jsonResponse, (AbstractMessage.Builder)functionStatusBuilder);
            }
            catch (IOException e) {
                log.warn("Got invalid function status response from {}", (Object)workerInfo, (Object)e);
                throw new RuntimeException(e);
            }
            functionStatusBuilder.setWorkerId(assignedWorkerId);
            functionStatus = functionStatusBuilder.build();
        }
        return functionStatus;
    }

    public InstanceCommunication.FunctionStatusList getAllFunctionStatus(String tenant, String namespace, String functionName) {
        Collection<Function.Assignment> assignments = this.findFunctionAssignments(tenant, namespace, functionName);
        InstanceCommunication.FunctionStatusList.Builder functionStatusListBuilder = InstanceCommunication.FunctionStatusList.newBuilder();
        if (assignments.isEmpty()) {
            return functionStatusListBuilder.build();
        }
        for (Function.Assignment assignment : assignments) {
            InstanceCommunication.FunctionStatus functionStatus = this.getFunctionInstanceStatus(assignment.getInstance().getFunctionMetaData().getFunctionDetails().getTenant(), assignment.getInstance().getFunctionMetaData().getFunctionDetails().getNamespace(), assignment.getInstance().getFunctionMetaData().getFunctionDetails().getName(), assignment.getInstance().getInstanceId());
            functionStatusListBuilder.addFunctionStatusList(functionStatus);
        }
        return functionStatusListBuilder.build();
    }

    public synchronized void processAssignmentUpdate(MessageId messageId, Request.AssignmentsUpdate assignmentsUpdate) {
        if (assignmentsUpdate.getVersion() > this.currentAssignmentVersion) {
            FunctionRuntimeInfo functionRuntimeInfo;
            Function.Assignment assignment;
            String fullyQualifiedInstanceId;
            HashMap<String, Function.Assignment> assignmentMap = new HashMap<String, Function.Assignment>();
            for (Object assignment2 : assignmentsUpdate.getAssignmentsList()) {
                assignmentMap.put(Utils.getFullyQualifiedInstanceId(assignment2.getInstance()), (Function.Assignment)assignment2);
            }
            HashMap<String, Function.Assignment> existingAssignmentMap = new HashMap<String, Function.Assignment>();
            for (Map map : this.workerIdToAssignments.values()) {
                existingAssignmentMap.putAll(map);
            }
            Map<String, Function.Assignment> assignmentsToAdd = this.diff(assignmentMap, existingAssignmentMap);
            Map<String, Function.Assignment> map = this.diff(existingAssignmentMap, assignmentMap);
            Map<String, Function.Assignment> existingAssignments = this.inCommon(assignmentMap, existingAssignmentMap);
            for (Map.Entry<String, Function.Assignment> assignmentEntry : assignmentsToAdd.entrySet()) {
                fullyQualifiedInstanceId = assignmentEntry.getKey();
                assignment = assignmentEntry.getValue();
                this.setAssignment(assignment);
                if (!assignment.getWorkerId().equals(this.workerConfig.getWorkerId())) continue;
                if (!this.functionRuntimeInfoMap.containsKey(fullyQualifiedInstanceId)) {
                    this.setFunctionRuntimeInfo(fullyQualifiedInstanceId, new FunctionRuntimeInfo().setFunctionInstance(assignment.getInstance()));
                } else {
                    log.warn("Function {} already running. Going to restart function.", (Object)this.functionRuntimeInfoMap.get(fullyQualifiedInstanceId));
                    this.insertStopAction(this.functionRuntimeInfoMap.get(fullyQualifiedInstanceId));
                }
                functionRuntimeInfo = this.functionRuntimeInfoMap.get(fullyQualifiedInstanceId);
                this.insertStartAction(functionRuntimeInfo);
            }
            for (Map.Entry<String, Function.Assignment> assignmentEntry : map.entrySet()) {
                fullyQualifiedInstanceId = assignmentEntry.getKey();
                assignment = assignmentEntry.getValue();
                functionRuntimeInfo = this.functionRuntimeInfoMap.get(fullyQualifiedInstanceId);
                if (functionRuntimeInfo != null) {
                    this.insertStopAction(functionRuntimeInfo);
                    this.deleteFunctionRuntimeInfo(fullyQualifiedInstanceId);
                }
                this.deleteAssignment(assignment);
            }
            for (Map.Entry<String, Function.Assignment> assignmentEntry : existingAssignments.entrySet()) {
                fullyQualifiedInstanceId = assignmentEntry.getKey();
                assignment = assignmentEntry.getValue();
                Function.Assignment existingAssignment = this.findAssignment(assignment);
                if (existingAssignment.equals((Object)assignment)) continue;
                FunctionRuntimeInfo functionRuntimeInfo2 = this.functionRuntimeInfoMap.get(fullyQualifiedInstanceId);
                if (functionRuntimeInfo2 != null) {
                    this.insertStopAction(functionRuntimeInfo2);
                }
                if (!assignment.getWorkerId().equals(this.workerConfig.getWorkerId())) continue;
                FunctionRuntimeInfo newFunctionRuntimeInfo = new FunctionRuntimeInfo();
                newFunctionRuntimeInfo.setFunctionInstance(assignment.getInstance());
                this.insertStartAction(newFunctionRuntimeInfo);
                this.setFunctionRuntimeInfo(fullyQualifiedInstanceId, newFunctionRuntimeInfo);
                this.setAssignment(assignment);
            }
            this.currentAssignmentVersion = assignmentsUpdate.getVersion();
        } else {
            log.debug("Received out of date assignment update: {}", (Object)assignmentsUpdate);
        }
    }

    public Map<String, FunctionRuntimeInfo> getFunctionRuntimeInfos() {
        return this.functionRuntimeInfoMap;
    }

    public void updateRates() {
        for (Map.Entry<String, FunctionRuntimeInfo> entry : this.functionRuntimeInfoMap.entrySet()) {
            Runtime functionRuntime;
            RuntimeSpawner functionRuntimeSpawner = entry.getValue().getRuntimeSpawner();
            if (functionRuntimeSpawner == null || (functionRuntime = functionRuntimeSpawner.getRuntime()) == null) continue;
            try {
                functionRuntime.resetMetrics().get();
            }
            catch (Exception e) {
                log.error("Failed to update stats for {}-{}", (Object)entry.getKey(), (Object)e.getMessage());
            }
        }
    }

    @VisibleForTesting
    void insertStopAction(FunctionRuntimeInfo functionRuntimeInfo) {
        FunctionAction functionAction = new FunctionAction();
        functionAction.setAction(FunctionAction.Action.STOP);
        functionAction.setFunctionRuntimeInfo(functionRuntimeInfo);
        try {
            this.actionQueue.put(functionAction);
        }
        catch (InterruptedException ex) {
            throw new RuntimeException("Interrupted while putting action");
        }
    }

    @VisibleForTesting
    void insertStartAction(FunctionRuntimeInfo functionRuntimeInfo) {
        FunctionAction functionAction = new FunctionAction();
        functionAction.setAction(FunctionAction.Action.START);
        functionAction.setFunctionRuntimeInfo(functionRuntimeInfo);
        try {
            this.actionQueue.put(functionAction);
        }
        catch (InterruptedException ex) {
            throw new RuntimeException("Interrupted while putting action");
        }
    }

    private Function.Assignment findAssignment(String tenant, String namespace, String functionName, int instanceId) {
        String fullyQualifiedInstanceId = Utils.getFullyQualifiedInstanceId(tenant, namespace, functionName, instanceId);
        for (Map.Entry<String, Map<String, Function.Assignment>> entry : this.workerIdToAssignments.entrySet()) {
            Map<String, Function.Assignment> assignmentMap = entry.getValue();
            Function.Assignment existingAssignment = assignmentMap.get(fullyQualifiedInstanceId);
            if (existingAssignment == null) continue;
            return existingAssignment;
        }
        return null;
    }

    private Function.Assignment findAssignment(Function.Assignment assignment) {
        return this.findAssignment(assignment.getInstance().getFunctionMetaData().getFunctionDetails().getTenant(), assignment.getInstance().getFunctionMetaData().getFunctionDetails().getNamespace(), assignment.getInstance().getFunctionMetaData().getFunctionDetails().getName(), assignment.getInstance().getInstanceId());
    }

    @VisibleForTesting
    void setAssignment(Function.Assignment assignment) {
        if (!this.workerIdToAssignments.containsKey(assignment.getWorkerId())) {
            this.workerIdToAssignments.put(assignment.getWorkerId(), new HashMap());
        }
        this.workerIdToAssignments.get(assignment.getWorkerId()).put(Utils.getFullyQualifiedInstanceId(assignment.getInstance()), assignment);
    }

    @VisibleForTesting
    void deleteAssignment(Function.Assignment assignment) {
        Map<String, Function.Assignment> assignmentMap = this.workerIdToAssignments.get(assignment.getWorkerId());
        if (assignmentMap != null) {
            String fullyQualifiedInstanceId = Utils.getFullyQualifiedInstanceId(assignment.getInstance());
            if (assignmentMap.containsKey(fullyQualifiedInstanceId)) {
                assignmentMap.remove(fullyQualifiedInstanceId);
            }
            if (assignmentMap.isEmpty()) {
                this.workerIdToAssignments.remove(assignment.getWorkerId());
            }
        }
    }

    private void deleteFunctionRuntimeInfo(String fullyQualifiedInstanceId) {
        this.functionRuntimeInfoMap.remove(fullyQualifiedInstanceId);
    }

    private void setFunctionRuntimeInfo(String fullyQualifiedInstanceId, FunctionRuntimeInfo functionRuntimeInfo) {
        this.functionRuntimeInfoMap.put(fullyQualifiedInstanceId, functionRuntimeInfo);
    }

    @Override
    public void close() throws Exception {
        this.functionActioner.close();
        this.functionAssignmentTailer.close();
        if (this.runtimeFactory != null) {
            this.runtimeFactory.close();
        }
    }

    private Map<String, Function.Assignment> diff(Map<String, Function.Assignment> assignmentMap1, Map<String, Function.Assignment> assignmentMap2) {
        HashMap<String, Function.Assignment> result = new HashMap<String, Function.Assignment>();
        for (Map.Entry<String, Function.Assignment> entry : assignmentMap1.entrySet()) {
            if (assignmentMap2.containsKey(entry.getKey())) continue;
            result.put(entry.getKey(), entry.getValue());
        }
        return result;
    }

    private Map<String, Function.Assignment> inCommon(Map<String, Function.Assignment> assignmentMap1, Map<String, Function.Assignment> assignmentMap2) {
        HashMap<String, Function.Assignment> result = new HashMap<String, Function.Assignment>();
        for (Map.Entry<String, Function.Assignment> entry : assignmentMap1.entrySet()) {
            if (!assignmentMap2.containsKey(entry.getKey())) continue;
            result.put(entry.getKey(), entry.getValue());
        }
        return result;
    }

    private FunctionRuntimeInfo getFunctionRuntimeInfo(String fullyQualifiedInstanceId) {
        return this.functionRuntimeInfoMap.get(fullyQualifiedInstanceId);
    }
}

