/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.functions.worker;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.common.util.Reflections;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.utils.Actions;
import org.apache.pulsar.functions.utils.FunctionCommon;
import org.apache.pulsar.functions.worker.FunctionMetaDataManager;
import org.apache.pulsar.functions.worker.FunctionRuntimeManager;
import org.apache.pulsar.functions.worker.MembershipManager;
import org.apache.pulsar.functions.worker.WorkerConfig;
import org.apache.pulsar.functions.worker.scheduler.IScheduler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SchedulerManager
implements AutoCloseable {
    private static final Logger log = LoggerFactory.getLogger(SchedulerManager.class);
    private final WorkerConfig workerConfig;
    private FunctionMetaDataManager functionMetaDataManager;
    private MembershipManager membershipManager;
    private FunctionRuntimeManager functionRuntimeManager;
    private final IScheduler scheduler;
    private final Producer<byte[]> producer;
    private final ScheduledExecutorService executorService;
    private final PulsarAdmin admin;
    AtomicBoolean isCompactionNeeded = new AtomicBoolean(false);
    private static final long DEFAULT_ADMIN_API_BACKOFF_SEC = 60L;
    public static final String HEARTBEAT_TENANT = "pulsar-function";
    public static final String HEARTBEAT_NAMESPACE = "heartbeat";

    public SchedulerManager(WorkerConfig workerConfig, PulsarClient pulsarClient, PulsarAdmin admin, ScheduledExecutorService executor) {
        this.workerConfig = workerConfig;
        this.admin = admin;
        this.scheduler = (IScheduler)Reflections.createInstance((String)workerConfig.getSchedulerClassName(), IScheduler.class, (ClassLoader)Thread.currentThread().getContextClassLoader());
        this.producer = SchedulerManager.createProducer(pulsarClient, workerConfig);
        this.executorService = executor;
        this.scheduleCompaction(executor, workerConfig.getTopicCompactionFrequencySec());
    }

    private static Producer<byte[]> createProducer(PulsarClient client, WorkerConfig config) {
        Actions.Action createProducerAction = Actions.Action.builder().actionName(String.format("Creating producer for assignment topic %s", config.getFunctionAssignmentTopic())).numRetries(5).sleepBetweenInvocationsMs(10000L).supplier(() -> {
            try {
                Producer producer = (Producer)client.newProducer().topic(config.getFunctionAssignmentTopic()).enableBatching(false).blockIfQueueFull(true).compressionType(CompressionType.LZ4).sendTimeout(0, TimeUnit.MILLISECONDS).producerName(config.getWorkerId() + "-scheduler-manager").createAsync().get(10L, TimeUnit.SECONDS);
                return Actions.ActionResult.builder().success(true).result((Object)producer).build();
            }
            catch (Exception e) {
                log.error("Exception while at creating producer to topic {}", (Object)config.getFunctionAssignmentTopic(), (Object)e);
                return Actions.ActionResult.builder().success(false).build();
            }
        }).build();
        AtomicReference producer = new AtomicReference();
        try {
            Actions.newBuilder().addAction(createProducerAction.toBuilder().onSuccess(actionResult -> producer.set((Producer)actionResult.getResult())).build()).run();
        }
        catch (InterruptedException e) {
            log.error("Interrupted at creating producer to topic {}", (Object)config.getFunctionAssignmentTopic(), (Object)e);
            Thread.currentThread().interrupt();
            throw new RuntimeException(e);
        }
        if (producer.get() == null) {
            throw new RuntimeException("Can't create a producer on assignment topic " + config.getFunctionAssignmentTopic());
        }
        return (Producer)producer.get();
    }

    public Future<?> schedule() {
        return this.executorService.submit(() -> {
            SchedulerManager schedulerManager = this;
            synchronized (schedulerManager) {
                boolean isLeader = this.membershipManager.isLeader();
                if (isLeader) {
                    try {
                        this.invokeScheduler();
                    }
                    catch (Exception e) {
                        log.warn("Failed to invoke scheduler", (Throwable)e);
                        throw e;
                    }
                }
            }
        });
    }

    private void scheduleCompaction(ScheduledExecutorService executor, long scheduleFrequencySec) {
        if (executor != null) {
            executor.scheduleWithFixedDelay(() -> {
                if (this.membershipManager.isLeader() && this.isCompactionNeeded.get()) {
                    this.compactAssignmentTopic();
                    this.isCompactionNeeded.set(false);
                }
            }, scheduleFrequencySec, scheduleFrequencySec, TimeUnit.SECONDS);
        }
    }

    @VisibleForTesting
    public void invokeScheduler() {
        Set<String> currentMembership = this.membershipManager.getCurrentMembership().stream().map(workerInfo -> workerInfo.getWorkerId()).collect(Collectors.toSet());
        List<Function.FunctionMetaData> allFunctions = this.functionMetaDataManager.getAllFunctionMetaData();
        Map<String, Function.Instance> allInstances = SchedulerManager.computeAllInstances(allFunctions, this.functionRuntimeManager.getRuntimeFactory().externallyManaged());
        Map<String, Map<String, Function.Assignment>> workerIdToAssignments = this.functionRuntimeManager.getCurrentAssignments();
        Iterator<Map.Entry<String, Map<String, Function.Assignment>>> it = workerIdToAssignments.entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry<String, Map<String, Function.Assignment>> workerIdToAssignmentEntry2 = it.next();
            Map<String, Function.Assignment> functionMap = workerIdToAssignmentEntry2.getValue();
            functionMap.entrySet().removeIf(entry -> {
                boolean deleted;
                String fullyQualifiedInstanceId = (String)entry.getKey();
                boolean bl = deleted = !allInstances.containsKey(fullyQualifiedInstanceId);
                if (deleted) {
                    this.publishNewAssignment(((Function.Assignment)entry.getValue()).toBuilder().build(), true);
                }
                return deleted;
            });
            for (Map.Entry<String, Function.Assignment> entry2 : functionMap.entrySet()) {
                String fullyQualifiedInstanceId = entry2.getKey();
                Function.Assignment assignment = entry2.getValue();
                Function.Instance instance = allInstances.get(fullyQualifiedInstanceId);
                if (assignment.getInstance().equals((Object)instance)) continue;
                functionMap.put(fullyQualifiedInstanceId, assignment.toBuilder().setInstance(instance).build());
                this.publishNewAssignment(assignment.toBuilder().setInstance(instance).build().toBuilder().build(), false);
            }
            if (!functionMap.isEmpty()) continue;
            it.remove();
        }
        List<Function.Assignment> currentAssignments = workerIdToAssignments.entrySet().stream().filter(workerIdToAssignmentEntry -> {
            String workerId = (String)workerIdToAssignmentEntry.getKey();
            return currentMembership.contains(workerId);
        }).flatMap(stringMapEntry -> ((Map)stringMapEntry.getValue()).values().stream()).collect(Collectors.toList());
        Pair<List<Function.Instance>, List<Function.Assignment>> unassignedInstances = this.getUnassignedFunctionInstances(workerIdToAssignments, allInstances);
        List<Function.Assignment> assignments = this.scheduler.schedule((List)unassignedInstances.getLeft(), currentAssignments, currentMembership);
        assignments.addAll((Collection)unassignedInstances.getRight());
        if (log.isDebugEnabled()) {
            log.debug("New assignments computed: {}", assignments);
        }
        this.isCompactionNeeded.set(!assignments.isEmpty());
        for (Function.Assignment assignment : assignments) {
            this.publishNewAssignment(assignment, false);
        }
    }

    public void compactAssignmentTopic() {
        if (this.admin != null) {
            try {
                this.admin.topics().triggerCompaction(this.workerConfig.getFunctionAssignmentTopic());
            }
            catch (PulsarAdminException e) {
                log.error("Failed to trigger compaction", (Throwable)e);
                this.executorService.schedule(() -> this.compactAssignmentTopic(), 60L, TimeUnit.SECONDS);
            }
        }
    }

    private void publishNewAssignment(Function.Assignment assignment, boolean deleted) {
        try {
            String fullyQualifiedInstanceId = FunctionCommon.getFullyQualifiedInstanceId((Function.Instance)assignment.getInstance());
            this.producer.newMessage().key(fullyQualifiedInstanceId).value((Object)(deleted ? "".getBytes() : assignment.toByteArray())).sendAsync().get();
        }
        catch (Exception e) {
            log.error("Failed to {} assignment update {}", new Object[]{assignment, deleted ? "send" : "deleted", e});
            throw new RuntimeException(e);
        }
    }

    public static Map<String, Function.Instance> computeAllInstances(List<Function.FunctionMetaData> allFunctions, boolean externallyManagedRuntime) {
        HashMap<String, Function.Instance> functionInstances = new HashMap<String, Function.Instance>();
        for (Function.FunctionMetaData functionMetaData : allFunctions) {
            for (Function.Instance instance : SchedulerManager.computeInstances(functionMetaData, externallyManagedRuntime)) {
                functionInstances.put(FunctionCommon.getFullyQualifiedInstanceId((Function.Instance)instance), instance);
            }
        }
        return functionInstances;
    }

    public static List<Function.Instance> computeInstances(Function.FunctionMetaData functionMetaData, boolean externallyManagedRuntime) {
        LinkedList<Function.Instance> functionInstances = new LinkedList<Function.Instance>();
        if (!externallyManagedRuntime) {
            int instances = functionMetaData.getFunctionDetails().getParallelism();
            for (int i = 0; i < instances; ++i) {
                functionInstances.add(Function.Instance.newBuilder().setFunctionMetaData(functionMetaData).setInstanceId(i).build());
            }
        } else {
            functionInstances.add(Function.Instance.newBuilder().setFunctionMetaData(functionMetaData).setInstanceId(-1).build());
        }
        return functionInstances;
    }

    private Pair<List<Function.Instance>, List<Function.Assignment>> getUnassignedFunctionInstances(Map<String, Map<String, Function.Assignment>> currentAssignments, Map<String, Function.Instance> functionInstances) {
        LinkedList<Function.Instance> unassignedFunctionInstances = new LinkedList<Function.Instance>();
        ArrayList heartBeatAssignments = Lists.newArrayList();
        HashMap<String, Function.Assignment> assignmentMap = new HashMap<String, Function.Assignment>();
        for (Map<String, Function.Assignment> map : currentAssignments.values()) {
            assignmentMap.putAll(map);
        }
        for (Map.Entry entry : functionInstances.entrySet()) {
            String fullyQualifiedInstanceId = (String)entry.getKey();
            Function.Instance instance = (Function.Instance)entry.getValue();
            String heartBeatWorkerId = SchedulerManager.checkHeartBeatFunction(instance);
            if (heartBeatWorkerId != null) {
                heartBeatAssignments.add(Function.Assignment.newBuilder().setInstance(instance).setWorkerId(heartBeatWorkerId).build());
                continue;
            }
            if (assignmentMap.containsKey(fullyQualifiedInstanceId)) continue;
            unassignedFunctionInstances.add(instance);
        }
        return ImmutablePair.of(unassignedFunctionInstances, (Object)heartBeatAssignments);
    }

    @Override
    public void close() {
        try {
            this.producer.close();
        }
        catch (PulsarClientException e) {
            log.warn("Failed to shutdown scheduler manager assignment producer", (Throwable)e);
        }
    }

    public static String checkHeartBeatFunction(Function.Instance funInstance) {
        if (funInstance.getFunctionMetaData() != null && funInstance.getFunctionMetaData().getFunctionDetails() != null) {
            Function.FunctionDetails funDetails = funInstance.getFunctionMetaData().getFunctionDetails();
            return HEARTBEAT_TENANT.equals(funDetails.getTenant()) && HEARTBEAT_NAMESPACE.equals(funDetails.getNamespace()) ? funDetails.getName() : null;
        }
        return null;
    }

    public void setFunctionMetaDataManager(FunctionMetaDataManager functionMetaDataManager) {
        this.functionMetaDataManager = functionMetaDataManager;
    }

    public void setMembershipManager(MembershipManager membershipManager) {
        this.membershipManager = membershipManager;
    }

    public void setFunctionRuntimeManager(FunctionRuntimeManager functionRuntimeManager) {
        this.functionRuntimeManager = functionRuntimeManager;
    }
}

