package org.apache.pulsar.functions.worker;

import com.google.common.annotations.VisibleForTesting;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerEventListener;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.policies.data.ConsumerStats;
import org.apache.pulsar.common.policies.data.SubscriptionStats;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.utils.FunctionDetailsUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/functions/worker/MembershipManager.class */
public class MembershipManager implements AutoCloseable, ConsumerEventListener {
    private final String consumerName;
    private final Consumer<byte[]> consumer;
    private final WorkerConfig workerConfig;
    private PulsarAdmin pulsarAdminClient;
    static final String COORDINATION_TOPIC_SUBSCRIPTION = "participants";
    private static final Logger log = LoggerFactory.getLogger(MembershipManager.class);
    private static String WORKER_IDENTIFIER = "id";
    private final AtomicBoolean isLeader = new AtomicBoolean();

    @VisibleForTesting
    Map<Function.Instance, Long> unsignedFunctionDurations = new HashMap();
    private final CompletableFuture<Void> firstConsumerEventFuture = new CompletableFuture<>();

    /* loaded from: input_file:org/apache/pulsar/functions/worker/MembershipManager$WorkerInfo.class */
    public static class WorkerInfo {
        private String workerId;
        private String workerHostname;
        private int port;

        public static WorkerInfo of(String str, String str2, int i) {
            return new WorkerInfo(str, str2, i);
        }

        public static WorkerInfo parseFrom(String str) {
            String[] split = str.split(":");
            if (split.length != 3) {
                throw new IllegalArgumentException("Invalid string to parse WorkerInfo : " + str);
            }
            return new WorkerInfo(split[0], split[1], Integer.parseInt(split[2]));
        }

        public String getWorkerId() {
            return this.workerId;
        }

        public String getWorkerHostname() {
            return this.workerHostname;
        }

        public int getPort() {
            return this.port;
        }

        private WorkerInfo(String str, String str2, int i) {
            this.workerId = str;
            this.workerHostname = str2;
            this.port = i;
        }

        public String toString() {
            return "MembershipManager.WorkerInfo(workerId=" + getWorkerId() + ", workerHostname=" + getWorkerHostname() + ", port=" + getPort() + ")";
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public MembershipManager(WorkerConfig workerConfig, PulsarClient pulsarClient) throws PulsarClientException {
        this.workerConfig = workerConfig;
        this.consumerName = String.format("%s:%s:%d", workerConfig.getWorkerId(), workerConfig.getWorkerHostname(), Integer.valueOf(workerConfig.getWorkerPort()));
        this.consumer = pulsarClient.newConsumer().topic(new String[]{workerConfig.getClusterCoordinationTopic()}).subscriptionName(COORDINATION_TOPIC_SUBSCRIPTION).subscriptionType(SubscriptionType.Failover).consumerEventListener(this).property(WORKER_IDENTIFIER, this.consumerName).subscribe();
    }

    public void becameActive(Consumer<?> consumer, int i) {
        this.firstConsumerEventFuture.complete(null);
        if (this.isLeader.compareAndSet(false, true)) {
            log.info("Worker {} became the leader.", this.consumerName);
        }
    }

    public void becameInactive(Consumer<?> consumer, int i) {
        this.firstConsumerEventFuture.complete(null);
        if (this.isLeader.compareAndSet(true, false)) {
            log.info("Worker {} lost the leadership.", this.consumerName);
        }
    }

    public boolean isLeader() {
        return this.isLeader.get();
    }

    public List<WorkerInfo> getCurrentMembership() {
        LinkedList linkedList = new LinkedList();
        try {
            Iterator it = ((SubscriptionStats) getPulsarAdminClient().topics().getStats(this.workerConfig.getClusterCoordinationTopic()).subscriptions.get(COORDINATION_TOPIC_SUBSCRIPTION)).consumers.iterator();
            while (it.hasNext()) {
                linkedList.add(WorkerInfo.parseFrom((String) ((ConsumerStats) it.next()).metadata.get(WORKER_IDENTIFIER)));
            }
            return linkedList;
        } catch (PulsarAdminException e) {
            log.error("Failed to get status of coordinate topic {}", this.workerConfig.getClusterCoordinationTopic(), e);
            throw new RuntimeException((Throwable) e);
        }
    }

    @Override // java.lang.AutoCloseable
    public void close() throws PulsarClientException {
        this.consumer.close();
        if (this.pulsarAdminClient != null) {
            this.pulsarAdminClient.close();
        }
    }

    public void checkFailures(FunctionMetaDataManager functionMetaDataManager, FunctionRuntimeManager functionRuntimeManager, SchedulerManager schedulerManager) {
        Set set = (Set) getCurrentMembership().stream().map(workerInfo -> {
            return workerInfo.getWorkerId();
        }).collect(Collectors.toSet());
        List<Function.FunctionMetaData> allFunctionMetaData = functionMetaDataManager.getAllFunctionMetaData();
        HashMap hashMap = new HashMap();
        for (Function.FunctionMetaData functionMetaData : allFunctionMetaData) {
            hashMap.put(FunctionDetailsUtils.getFullyQualifiedName(functionMetaData.getFunctionDetails()), functionMetaData);
        }
        Map<String, Map<String, Function.Assignment>> currentAssignments = functionRuntimeManager.getCurrentAssignments();
        HashMap hashMap2 = new HashMap();
        Iterator<Map<String, Function.Assignment>> it = currentAssignments.values().iterator();
        while (it.hasNext()) {
            hashMap2.putAll(it.next());
        }
        long currentTimeMillis = System.currentTimeMillis();
        Iterator<Map.Entry<Function.Instance, Long>> it2 = this.unsignedFunctionDurations.entrySet().iterator();
        while (it2.hasNext()) {
            Map.Entry<Function.Instance, Long> next = it2.next();
            String fullyQualifiedName = FunctionDetailsUtils.getFullyQualifiedName(next.getKey().getFunctionMetaData().getFunctionDetails());
            String fullyQualifiedInstanceId = Utils.getFullyQualifiedInstanceId(next.getKey());
            if (hashMap.containsKey(fullyQualifiedName)) {
                Function.Assignment assignment = (Function.Assignment) hashMap2.get(fullyQualifiedInstanceId);
                if (assignment != null && set.contains(assignment.getWorkerId())) {
                    it2.remove();
                }
            } else {
                it2.remove();
            }
        }
        for (Function.FunctionMetaData functionMetaData2 : allFunctionMetaData) {
            Set set2 = (Set) FunctionRuntimeManager.findFunctionAssignments(functionMetaData2.getFunctionDetails().getTenant(), functionMetaData2.getFunctionDetails().getNamespace(), functionMetaData2.getFunctionDetails().getName(), currentAssignments).stream().map(assignment2 -> {
                return assignment2.getInstance();
            }).collect(Collectors.toSet());
            for (Function.Instance instance : new HashSet(SchedulerManager.computeInstances(functionMetaData2))) {
                if (!set2.contains(instance) && !this.unsignedFunctionDurations.containsKey(instance)) {
                    this.unsignedFunctionDurations.put(instance, Long.valueOf(currentTimeMillis));
                }
            }
        }
        for (Map.Entry<String, Map<String, Function.Assignment>> entry : currentAssignments.entrySet()) {
            String key = entry.getKey();
            Map<String, Function.Assignment> value = entry.getValue();
            if (!set.contains(key)) {
                Iterator<Function.Assignment> it3 = value.values().iterator();
                while (it3.hasNext()) {
                    Function.Instance assignment3 = it3.next().getInstance();
                    if (!this.unsignedFunctionDurations.containsKey(assignment3)) {
                        this.unsignedFunctionDurations.put(assignment3, Long.valueOf(currentTimeMillis));
                    }
                }
            }
        }
        boolean z = false;
        LinkedList linkedList = new LinkedList();
        LinkedList linkedList2 = new LinkedList();
        for (Map.Entry<Function.Instance, Long> entry2 : this.unsignedFunctionDurations.entrySet()) {
            Function.Instance key2 = entry2.getKey();
            if (currentTimeMillis - entry2.getValue().longValue() > this.workerConfig.getRescheduleTimeoutMs()) {
                linkedList.add(key2);
                Function.Assignment assignment4 = (Function.Assignment) hashMap2.get(Utils.getFullyQualifiedInstanceId(key2));
                if (assignment4 != null) {
                    linkedList2.add(assignment4);
                }
                z = true;
            }
        }
        if (!linkedList2.isEmpty()) {
            functionRuntimeManager.removeAssignments(linkedList2);
        }
        if (z) {
            log.info("Functions that need scheduling/rescheduling: {}", linkedList);
            schedulerManager.schedule();
        }
    }

    private PulsarAdmin getPulsarAdminClient() {
        if (this.pulsarAdminClient == null) {
            this.pulsarAdminClient = Utils.getPulsarAdminClient(this.workerConfig.getPulsarWebServiceUrl());
        }
        return this.pulsarAdminClient;
    }
}
