package org.apache.pulsar.functions.worker;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Stopwatch;
import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.utils.Reflections;
import org.apache.pulsar.functions.worker.scheduler.IScheduler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/functions/worker/SchedulerManager.class */
public class SchedulerManager implements AutoCloseable {
    private static final Logger log = LoggerFactory.getLogger(SchedulerManager.class);
    private final WorkerConfig workerConfig;
    private FunctionMetaDataManager functionMetaDataManager;
    private MembershipManager membershipManager;
    private FunctionRuntimeManager functionRuntimeManager;
    private final IScheduler scheduler;
    private final Producer<byte[]> producer;
    private final ScheduledExecutorService executorService;
    private final PulsarAdmin admin;
    AtomicBoolean isCompactionNeeded = new AtomicBoolean(false);
    private static final long DEFAULT_ADMIN_API_BACKOFF_SEC = 60;
    public static final String HEARTBEAT_TENANT = "pulsar-function";
    public static final String HEARTBEAT_NAMESPACE = "heartbeat";

    public SchedulerManager(WorkerConfig workerConfig, PulsarClient pulsarClient, PulsarAdmin pulsarAdmin, ScheduledExecutorService scheduledExecutorService) {
        this.workerConfig = workerConfig;
        this.admin = pulsarAdmin;
        this.scheduler = (IScheduler) Reflections.createInstance(workerConfig.getSchedulerClassName(), IScheduler.class, Thread.currentThread().getContextClassLoader());
        this.producer = createProducer(pulsarClient, workerConfig);
        this.executorService = scheduledExecutorService;
        scheduleCompaction(scheduledExecutorService, workerConfig.getTopicCompactionFrequencySec());
    }

    private static Producer<byte[]> createProducer(PulsarClient pulsarClient, WorkerConfig workerConfig) {
        Stopwatch createStarted = Stopwatch.createStarted();
        for (int i = 0; i < 6; i++) {
            try {
                return (Producer) pulsarClient.newProducer().topic(workerConfig.getFunctionAssignmentTopic()).enableBatching(false).blockIfQueueFull(true).compressionType(CompressionType.LZ4).sendTimeout(0, TimeUnit.MILLISECONDS).createAsync().get(10L, TimeUnit.SECONDS);
            } catch (InterruptedException e) {
                log.error("Interrupted at creating producer to topic {}", workerConfig.getFunctionAssignmentTopic(), e);
                Thread.currentThread().interrupt();
                throw new RuntimeException(e);
            } catch (ExecutionException e2) {
                log.error("Encountered exceptions at creating producer for topic {}", workerConfig.getFunctionAssignmentTopic(), e2);
                throw new RuntimeException(e2);
            } catch (TimeoutException e3) {
                try {
                    log.info("Can't create a producer on assignment topic {} in {} seconds, retry in 10 seconds ...", Long.valueOf(createStarted.elapsed(TimeUnit.SECONDS)));
                    TimeUnit.SECONDS.sleep(10L);
                } catch (InterruptedException e4) {
                    log.error("Interrupted at creating producer to topic {}", workerConfig.getFunctionAssignmentTopic(), e3);
                    Thread.currentThread().interrupt();
                    throw new RuntimeException(e3);
                }
            }
        }
        throw new RuntimeException("Can't create a producer on assignment topic " + workerConfig.getFunctionAssignmentTopic() + " in " + createStarted.elapsed(TimeUnit.SECONDS) + " seconds, fail fast ...");
    }

    public Future<?> schedule() {
        return this.executorService.submit(() -> {
            synchronized (this) {
                if (this.membershipManager.isLeader()) {
                    try {
                        invokeScheduler();
                    } catch (Exception e) {
                        log.warn("Failed to invoke scheduler", e);
                        throw e;
                    }
                }
            }
        });
    }

    private void scheduleCompaction(ScheduledExecutorService scheduledExecutorService, long j) {
        if (scheduledExecutorService != null) {
            scheduledExecutorService.scheduleWithFixedDelay(() -> {
                if (this.membershipManager.isLeader() && this.isCompactionNeeded.get()) {
                    compactAssignmentTopic();
                    this.isCompactionNeeded.set(false);
                }
            }, j, j, TimeUnit.SECONDS);
        }
    }

    @VisibleForTesting
    public void invokeScheduler() {
        Set<String> set = (Set) this.membershipManager.getCurrentMembership().stream().map(workerInfo -> {
            return workerInfo.getWorkerId();
        }).collect(Collectors.toSet());
        Map<String, Function.Instance> computeAllInstances = computeAllInstances(this.functionMetaDataManager.getAllFunctionMetaData(), this.functionRuntimeManager.getRuntimeFactory().externallyManaged());
        Map<String, Map<String, Function.Assignment>> currentAssignments = this.functionRuntimeManager.getCurrentAssignments();
        Iterator<Map.Entry<String, Map<String, Function.Assignment>>> it = currentAssignments.entrySet().iterator();
        while (it.hasNext()) {
            Map<String, Function.Assignment> value = it.next().getValue();
            value.entrySet().removeIf(entry -> {
                boolean z = !computeAllInstances.containsKey((String) entry.getKey());
                if (z) {
                    publishNewAssignment(((Function.Assignment) entry.getValue()).toBuilder().build(), true);
                }
                return z;
            });
            for (Map.Entry<String, Function.Assignment> entry2 : value.entrySet()) {
                String key = entry2.getKey();
                Function.Assignment value2 = entry2.getValue();
                Function.Instance instance = computeAllInstances.get(key);
                if (!value2.getInstance().equals(instance)) {
                    value.put(key, value2.toBuilder().setInstance(instance).build());
                    publishNewAssignment(value2.toBuilder().setInstance(instance).build().toBuilder().build(), false);
                }
            }
            if (value.isEmpty()) {
                it.remove();
            }
        }
        List<Function.Assignment> list = (List) currentAssignments.entrySet().stream().filter(entry3 -> {
            return set.contains((String) entry3.getKey());
        }).flatMap(entry4 -> {
            return ((Map) entry4.getValue()).values().stream();
        }).collect(Collectors.toList());
        Pair<List<Function.Instance>, List<Function.Assignment>> unassignedFunctionInstances = getUnassignedFunctionInstances(currentAssignments, computeAllInstances);
        List<Function.Assignment> schedule = this.scheduler.schedule((List) unassignedFunctionInstances.getLeft(), list, set);
        schedule.addAll((Collection) unassignedFunctionInstances.getRight());
        if (log.isDebugEnabled()) {
            log.debug("New assignments computed: {}", schedule);
        }
        this.isCompactionNeeded.set(!schedule.isEmpty());
        Iterator<Function.Assignment> it2 = schedule.iterator();
        while (it2.hasNext()) {
            publishNewAssignment(it2.next(), false);
        }
    }

    public void compactAssignmentTopic() {
        if (this.admin != null) {
            try {
                this.admin.topics().triggerCompaction(this.workerConfig.getFunctionAssignmentTopic());
            } catch (PulsarAdminException e) {
                log.error("Failed to trigger compaction {}", e);
                this.executorService.schedule(() -> {
                    compactAssignmentTopic();
                }, DEFAULT_ADMIN_API_BACKOFF_SEC, TimeUnit.SECONDS);
            }
        }
    }

    private void publishNewAssignment(Function.Assignment assignment, boolean z) {
        try {
            this.producer.newMessage().key(org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(assignment.getInstance())).value(z ? "".getBytes() : assignment.toByteArray()).sendAsync().get();
        } catch (Exception e) {
            Logger logger = log;
            Object[] objArr = new Object[3];
            objArr[0] = assignment;
            objArr[1] = z ? "send" : "deleted";
            objArr[2] = e;
            logger.error("Failed to {} assignment update {}", objArr);
            throw new RuntimeException(e);
        }
    }

    public static Map<String, Function.Instance> computeAllInstances(List<Function.FunctionMetaData> list, boolean z) {
        HashMap hashMap = new HashMap();
        Iterator<Function.FunctionMetaData> it = list.iterator();
        while (it.hasNext()) {
            for (Function.Instance instance : computeInstances(it.next(), z)) {
                hashMap.put(org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(instance), instance);
            }
        }
        return hashMap;
    }

    public static List<Function.Instance> computeInstances(Function.FunctionMetaData functionMetaData, boolean z) {
        LinkedList linkedList = new LinkedList();
        if (z) {
            linkedList.add(Function.Instance.newBuilder().setFunctionMetaData(functionMetaData).setInstanceId(-1).build());
        } else {
            int parallelism = functionMetaData.getFunctionDetails().getParallelism();
            for (int i = 0; i < parallelism; i++) {
                linkedList.add(Function.Instance.newBuilder().setFunctionMetaData(functionMetaData).setInstanceId(i).build());
            }
        }
        return linkedList;
    }

    private Pair<List<Function.Instance>, List<Function.Assignment>> getUnassignedFunctionInstances(Map<String, Map<String, Function.Assignment>> map, Map<String, Function.Instance> map2) {
        LinkedList linkedList = new LinkedList();
        ArrayList newArrayList = Lists.newArrayList();
        HashMap hashMap = new HashMap();
        Iterator<Map<String, Function.Assignment>> it = map.values().iterator();
        while (it.hasNext()) {
            hashMap.putAll(it.next());
        }
        for (Map.Entry<String, Function.Instance> entry : map2.entrySet()) {
            String key = entry.getKey();
            Function.Instance value = entry.getValue();
            String checkHeartBeatFunction = checkHeartBeatFunction(value);
            if (checkHeartBeatFunction != null) {
                newArrayList.add(Function.Assignment.newBuilder().setInstance(value).setWorkerId(checkHeartBeatFunction).build());
            } else if (!hashMap.containsKey(key)) {
                linkedList.add(value);
            }
        }
        return ImmutablePair.of(linkedList, newArrayList);
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        try {
            this.producer.close();
        } catch (PulsarClientException e) {
            log.warn("Failed to shutdown scheduler manager assignment producer", e);
        }
    }

    public static String checkHeartBeatFunction(Function.Instance instance) {
        if (instance.getFunctionMetaData() == null || instance.getFunctionMetaData().getFunctionDetails() == null) {
            return null;
        }
        Function.FunctionDetails functionDetails = instance.getFunctionMetaData().getFunctionDetails();
        if (HEARTBEAT_TENANT.equals(functionDetails.getTenant()) && HEARTBEAT_NAMESPACE.equals(functionDetails.getNamespace())) {
            return functionDetails.getName();
        }
        return null;
    }

    public void setFunctionMetaDataManager(FunctionMetaDataManager functionMetaDataManager) {
        this.functionMetaDataManager = functionMetaDataManager;
    }

    public void setMembershipManager(MembershipManager membershipManager) {
        this.membershipManager = membershipManager;
    }

    public void setFunctionRuntimeManager(FunctionRuntimeManager functionRuntimeManager) {
        this.functionRuntimeManager = functionRuntimeManager;
    }
}
