/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.functions.worker;

import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.Request;
import org.apache.pulsar.functions.utils.Reflections;
import org.apache.pulsar.functions.worker.FunctionMetaDataManager;
import org.apache.pulsar.functions.worker.FunctionRuntimeManager;
import org.apache.pulsar.functions.worker.MembershipManager;
import org.apache.pulsar.functions.worker.Utils;
import org.apache.pulsar.functions.worker.WorkerConfig;
import org.apache.pulsar.functions.worker.scheduler.IScheduler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SchedulerManager
implements AutoCloseable {
    private static final Logger log = LoggerFactory.getLogger(SchedulerManager.class);
    private final WorkerConfig workerConfig;
    private FunctionMetaDataManager functionMetaDataManager;
    private MembershipManager membershipManager;
    private FunctionRuntimeManager functionRuntimeManager;
    private final IScheduler scheduler;
    private final Producer<byte[]> producer;
    private final ExecutorService executorService;

    public SchedulerManager(WorkerConfig workerConfig, PulsarClient pulsarClient) {
        this.workerConfig = workerConfig;
        this.scheduler = (IScheduler)Reflections.createInstance((String)workerConfig.getSchedulerClassName(), IScheduler.class, (ClassLoader)Thread.currentThread().getContextClassLoader());
        try {
            this.producer = pulsarClient.newProducer().topic(this.workerConfig.getFunctionAssignmentTopic()).enableBatching(true).blockIfQueueFull(true).compressionType(CompressionType.LZ4).sendTimeout(0, TimeUnit.MILLISECONDS).create();
        }
        catch (PulsarClientException e) {
            log.error("Failed to create producer to function assignment topic " + this.workerConfig.getFunctionAssignmentTopic(), (Throwable)e);
            throw new RuntimeException(e);
        }
        this.executorService = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
    }

    public Future<?> schedule() {
        return this.executorService.submit(() -> {
            SchedulerManager schedulerManager = this;
            synchronized (schedulerManager) {
                boolean isLeader = this.membershipManager.isLeader();
                if (isLeader) {
                    this.invokeScheduler();
                }
            }
        });
    }

    private void invokeScheduler() {
        List<String> currentMembership = this.membershipManager.getCurrentMembership().stream().map(workerInfo -> workerInfo.getWorkerId()).collect(Collectors.toList());
        List<Function.FunctionMetaData> allFunctions = this.functionMetaDataManager.getAllFunctionMetaData();
        Map<String, Function.Instance> allInstances = SchedulerManager.computeAllInstances(allFunctions);
        Map<String, Map<String, Function.Assignment>> workerIdToAssignments = this.functionRuntimeManager.getCurrentAssignments();
        Iterator<Map.Entry<String, Map<String, Function.Assignment>>> it = workerIdToAssignments.entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry<String, Map<String, Function.Assignment>> workerIdToAssignmentEntry = it.next();
            Map<String, Function.Assignment> functionMap = workerIdToAssignmentEntry.getValue();
            functionMap.entrySet().removeIf(entry -> {
                String fullyQualifiedInstanceId = (String)entry.getKey();
                return !allInstances.containsKey(fullyQualifiedInstanceId);
            });
            for (Map.Entry<String, Function.Assignment> entry2 : functionMap.entrySet()) {
                String fullyQualifiedInstanceId = entry2.getKey();
                Function.Assignment assignment = entry2.getValue();
                Function.Instance instance = allInstances.get(fullyQualifiedInstanceId);
                if (assignment.getInstance().equals((Object)instance)) continue;
                functionMap.put(fullyQualifiedInstanceId, assignment.toBuilder().setInstance(instance).build());
            }
            if (!functionMap.isEmpty()) continue;
            it.remove();
        }
        List<Function.Assignment> currentAssignments = workerIdToAssignments.entrySet().stream().flatMap(stringMapEntry -> ((Map)stringMapEntry.getValue()).values().stream()).collect(Collectors.toList());
        List<Function.Instance> needsAssignment = this.getUnassignedFunctionInstances(workerIdToAssignments, allInstances);
        List<Function.Assignment> assignments = this.scheduler.schedule(needsAssignment, currentAssignments, currentMembership);
        log.debug("New assignments computed: {}", assignments);
        long assignmentVersion = this.functionRuntimeManager.getCurrentAssignmentVersion() + 1L;
        Request.AssignmentsUpdate assignmentsUpdate = Request.AssignmentsUpdate.newBuilder().setVersion(assignmentVersion).addAllAssignments(assignments).build();
        CompletableFuture messageIdCompletableFuture = this.producer.sendAsync((Object)assignmentsUpdate.toByteArray());
        try {
            messageIdCompletableFuture.get();
        }
        catch (InterruptedException | ExecutionException e) {
            log.error("Failed to send assignment update", (Throwable)e);
            throw new RuntimeException(e);
        }
        int retries = 0;
        while (this.functionRuntimeManager.getCurrentAssignmentVersion() < assignmentVersion) {
            if (retries >= this.workerConfig.getAssignmentWriteMaxRetries()) {
                log.warn("Max number of retries reached for waiting for assignment to propagate. Will continue now.");
                break;
            }
            log.info("Waiting for assignments to propagate...");
            try {
                Thread.sleep(500L);
            }
            catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            ++retries;
        }
    }

    public static Map<String, Function.Instance> computeAllInstances(List<Function.FunctionMetaData> allFunctions) {
        HashMap<String, Function.Instance> functionInstances = new HashMap<String, Function.Instance>();
        for (Function.FunctionMetaData functionMetaData : allFunctions) {
            for (Function.Instance instance : SchedulerManager.computeInstances(functionMetaData)) {
                functionInstances.put(Utils.getFullyQualifiedInstanceId(instance), instance);
            }
        }
        return functionInstances;
    }

    public static List<Function.Instance> computeInstances(Function.FunctionMetaData functionMetaData) {
        LinkedList<Function.Instance> functionInstances = new LinkedList<Function.Instance>();
        int instances = functionMetaData.getFunctionDetails().getParallelism();
        for (int i = 0; i < instances; ++i) {
            functionInstances.add(Function.Instance.newBuilder().setFunctionMetaData(functionMetaData).setInstanceId(i).build());
        }
        return functionInstances;
    }

    private List<Function.Instance> getUnassignedFunctionInstances(Map<String, Map<String, Function.Assignment>> currentAssignments, Map<String, Function.Instance> functionInstances) {
        LinkedList<Function.Instance> unassignedFunctionInstances = new LinkedList<Function.Instance>();
        HashMap<String, Function.Assignment> assignmentMap = new HashMap<String, Function.Assignment>();
        for (Map<String, Function.Assignment> map : currentAssignments.values()) {
            assignmentMap.putAll(map);
        }
        for (Map.Entry entry : functionInstances.entrySet()) {
            String fullyQualifiedInstanceId = (String)entry.getKey();
            Function.Instance instance = (Function.Instance)entry.getValue();
            if (assignmentMap.containsKey(fullyQualifiedInstanceId)) continue;
            unassignedFunctionInstances.add(instance);
        }
        return unassignedFunctionInstances;
    }

    @Override
    public void close() {
        try {
            this.producer.close();
        }
        catch (PulsarClientException e) {
            log.warn("Failed to shutdown scheduler manager assignment producer", (Throwable)e);
        }
        this.executorService.shutdown();
    }

    public void setFunctionMetaDataManager(FunctionMetaDataManager functionMetaDataManager) {
        this.functionMetaDataManager = functionMetaDataManager;
    }

    public void setMembershipManager(MembershipManager membershipManager) {
        this.membershipManager = membershipManager;
    }

    public void setFunctionRuntimeManager(FunctionRuntimeManager functionRuntimeManager) {
        this.functionRuntimeManager = functionRuntimeManager;
    }
}

