/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.functions.worker;

import com.google.common.annotations.VisibleForTesting;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerEventListener;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.common.functions.WorkerInfo;
import org.apache.pulsar.common.policies.data.ConsumerStats;
import org.apache.pulsar.common.policies.data.SubscriptionStats;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.utils.FunctionDetailsUtils;
import org.apache.pulsar.functions.worker.FunctionMetaDataManager;
import org.apache.pulsar.functions.worker.FunctionRuntimeManager;
import org.apache.pulsar.functions.worker.SchedulerManager;
import org.apache.pulsar.functions.worker.Utils;
import org.apache.pulsar.functions.worker.WorkerConfig;
import org.apache.pulsar.functions.worker.WorkerService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MembershipManager
implements AutoCloseable,
ConsumerEventListener {
    private static final Logger log = LoggerFactory.getLogger(MembershipManager.class);
    private final String consumerName;
    private final ConsumerImpl<byte[]> consumer;
    private final WorkerConfig workerConfig;
    private PulsarAdmin pulsarAdminClient;
    private final CompletableFuture<Void> firstConsumerEventFuture;
    private final AtomicBoolean isLeader = new AtomicBoolean();
    static final String COORDINATION_TOPIC_SUBSCRIPTION = "participants";
    private static String WORKER_IDENTIFIER = "id";
    @VisibleForTesting
    Map<Function.Instance, Long> unsignedFunctionDurations = new HashMap<Function.Instance, Long>();

    MembershipManager(WorkerService service, PulsarClient client) throws PulsarClientException {
        this.workerConfig = service.getWorkerConfig();
        this.consumerName = String.format("%s:%s:%d", this.workerConfig.getWorkerId(), this.workerConfig.getWorkerHostname(), this.workerConfig.getWorkerPort());
        this.firstConsumerEventFuture = new CompletableFuture();
        this.consumer = (ConsumerImpl)client.newConsumer().topic(new String[]{this.workerConfig.getClusterCoordinationTopic()}).subscriptionName(COORDINATION_TOPIC_SUBSCRIPTION).subscriptionType(SubscriptionType.Failover).consumerEventListener((ConsumerEventListener)this).property(WORKER_IDENTIFIER, this.consumerName).subscribe();
        this.isLeader.set(this.checkLeader(service, this.consumer.getConsumerName()));
    }

    public void becameActive(Consumer<?> consumer, int partitionId) {
        this.firstConsumerEventFuture.complete(null);
        if (this.isLeader.compareAndSet(false, true)) {
            log.info("Worker {} became the leader.", (Object)this.consumerName);
        }
    }

    public void becameInactive(Consumer<?> consumer, int partitionId) {
        this.firstConsumerEventFuture.complete(null);
        if (this.isLeader.compareAndSet(true, false)) {
            log.info("Worker {} lost the leadership.", (Object)this.consumerName);
        }
    }

    public boolean isLeader() {
        return this.isLeader.get();
    }

    public List<WorkerInfo> getCurrentMembership() {
        LinkedList<WorkerInfo> workerIds = new LinkedList<WorkerInfo>();
        TopicStats topicStats = null;
        PulsarAdmin pulsarAdmin = this.getPulsarAdminClient();
        try {
            topicStats = pulsarAdmin.topics().getStats(this.workerConfig.getClusterCoordinationTopic());
        }
        catch (PulsarAdminException e) {
            log.error("Failed to get status of coordinate topic {}", (Object)this.workerConfig.getClusterCoordinationTopic(), (Object)e);
            throw new RuntimeException(e);
        }
        for (ConsumerStats consumerStats : ((SubscriptionStats)topicStats.subscriptions.get((Object)COORDINATION_TOPIC_SUBSCRIPTION)).consumers) {
            WorkerInfo workerInfo = WorkerInfo.parseFrom((String)((String)consumerStats.metadata.get(WORKER_IDENTIFIER)));
            workerIds.add(workerInfo);
        }
        return workerIds;
    }

    public WorkerInfo getLeader() {
        TopicStats topicStats = null;
        PulsarAdmin pulsarAdmin = this.getPulsarAdminClient();
        try {
            topicStats = pulsarAdmin.topics().getStats(this.workerConfig.getClusterCoordinationTopic());
        }
        catch (PulsarAdminException e) {
            log.error("Failed to get status of coordinate topic {}", (Object)this.workerConfig.getClusterCoordinationTopic(), (Object)e);
            throw new RuntimeException(e);
        }
        String activeConsumerName = ((SubscriptionStats)topicStats.subscriptions.get((Object)COORDINATION_TOPIC_SUBSCRIPTION)).activeConsumerName;
        WorkerInfo leader = null;
        for (ConsumerStats consumerStats : ((SubscriptionStats)topicStats.subscriptions.get((Object)COORDINATION_TOPIC_SUBSCRIPTION)).consumers) {
            if (!consumerStats.consumerName.equals(activeConsumerName)) continue;
            leader = WorkerInfo.parseFrom((String)((String)consumerStats.metadata.get(WORKER_IDENTIFIER)));
        }
        if (leader == null) {
            log.warn("Failed to determine leader in functions cluster");
        }
        return leader;
    }

    @Override
    public void close() throws PulsarClientException {
        this.consumer.close();
        if (this.pulsarAdminClient != null) {
            this.pulsarAdminClient.close();
        }
    }

    public void checkFailures(FunctionMetaDataManager functionMetaDataManager, FunctionRuntimeManager functionRuntimeManager, SchedulerManager schedulerManager) {
        Function.Instance instance;
        Set currentMembership = this.getCurrentMembership().stream().map(entry -> entry.getWorkerId()).collect(Collectors.toSet());
        List<Function.FunctionMetaData> functionMetaDataList = functionMetaDataManager.getAllFunctionMetaData();
        HashMap<String, Function.FunctionMetaData> functionMetaDataMap = new HashMap<String, Function.FunctionMetaData>();
        for (Function.FunctionMetaData entry2 : functionMetaDataList) {
            functionMetaDataMap.put(FunctionDetailsUtils.getFullyQualifiedName((Function.FunctionDetails)entry2.getFunctionDetails()), entry2);
        }
        Map<String, Map<String, Function.Assignment>> currentAssignments = functionRuntimeManager.getCurrentAssignments();
        HashMap<String, Function.Assignment> assignmentMap = new HashMap<String, Function.Assignment>();
        for (Map<String, Function.Assignment> entry3 : currentAssignments.values()) {
            assignmentMap.putAll(entry3);
        }
        long currentTimeMs = System.currentTimeMillis();
        Iterator<Map.Entry<Function.Instance, Long>> it = this.unsignedFunctionDurations.entrySet().iterator();
        while (it.hasNext()) {
            String assignedWorkerId;
            Iterator<Map.Entry<String, Map<String, Function.Assignment>>> entry4 = it.next();
            String fullyQualifiedFunctionName = FunctionDetailsUtils.getFullyQualifiedName((Function.FunctionDetails)((Function.Instance)entry4.getKey()).getFunctionMetaData().getFunctionDetails());
            String fullyQualifiedInstanceId = org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId((Function.Instance)((Function.Instance)entry4.getKey()));
            if (!functionMetaDataMap.containsKey(fullyQualifiedFunctionName)) {
                it.remove();
                continue;
            }
            Function.Assignment assignment2 = (Function.Assignment)assignmentMap.get(fullyQualifiedInstanceId);
            if (assignment2 == null || !currentMembership.contains(assignedWorkerId = assignment2.getWorkerId())) continue;
            it.remove();
        }
        for (Function.FunctionMetaData functionMetaData : functionMetaDataList) {
            Collection<Function.Assignment> assignments = FunctionRuntimeManager.findFunctionAssignments(functionMetaData.getFunctionDetails().getTenant(), functionMetaData.getFunctionDetails().getNamespace(), functionMetaData.getFunctionDetails().getName(), currentAssignments);
            Set assignedInstances = assignments.stream().map(assignment -> assignment.getInstance()).collect(Collectors.toSet());
            HashSet<Function.Instance> instances = new HashSet<Function.Instance>(SchedulerManager.computeInstances(functionMetaData, functionRuntimeManager.getRuntimeFactory().externallyManaged()));
            Iterator iterator = instances.iterator();
            while (iterator.hasNext()) {
                instance = (Function.Instance)iterator.next();
                if (assignedInstances.contains(instance) || this.unsignedFunctionDurations.containsKey(instance)) continue;
                this.unsignedFunctionDurations.put(instance, currentTimeMs);
            }
        }
        for (Map.Entry<String, Map<String, Function.Assignment>> entry5 : currentAssignments.entrySet()) {
            String workerId = entry5.getKey();
            Map<String, Function.Assignment> assignmentEntries = entry5.getValue();
            if (currentMembership.contains(workerId)) continue;
            for (Function.Assignment assignmentEntry : assignmentEntries.values()) {
                instance = assignmentEntry.getInstance();
                if (SchedulerManager.checkHeartBeatFunction(instance) != null || this.unsignedFunctionDurations.containsKey(instance)) continue;
                this.unsignedFunctionDurations.put(instance, currentTimeMs);
            }
        }
        boolean triggerScheduler = false;
        LinkedList<Function.Instance> needSchedule = new LinkedList<Function.Instance>();
        LinkedList<Function.Assignment> needRemove = new LinkedList<Function.Assignment>();
        for (Map.Entry<Function.Instance, Long> entry6 : this.unsignedFunctionDurations.entrySet()) {
            Function.Instance instance2 = entry6.getKey();
            long unassignedDurationMs = entry6.getValue();
            if (currentTimeMs - unassignedDurationMs <= this.workerConfig.getRescheduleTimeoutMs()) continue;
            needSchedule.add(instance2);
            Function.Assignment assignment3 = (Function.Assignment)assignmentMap.get(org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId((Function.Instance)instance2));
            if (assignment3 != null) {
                needRemove.add(assignment3);
            }
            triggerScheduler = true;
        }
        if (!needRemove.isEmpty()) {
            functionRuntimeManager.removeAssignments(needRemove);
        }
        if (triggerScheduler) {
            log.info("Functions that need scheduling/rescheduling: {}", needSchedule);
            schedulerManager.schedule();
        }
    }

    private PulsarAdmin getPulsarAdminClient() {
        if (this.pulsarAdminClient == null) {
            this.pulsarAdminClient = Utils.getPulsarAdminClient(this.workerConfig.getPulsarWebServiceUrl(), this.workerConfig.getClientAuthenticationPlugin(), this.workerConfig.getClientAuthenticationParameters(), this.workerConfig.getTlsTrustCertsFilePath(), this.workerConfig.isTlsAllowInsecureConnection());
        }
        return this.pulsarAdminClient;
    }

    private boolean checkLeader(WorkerService service, String consumerName) {
        try {
            TopicStats stats = service.getBrokerAdmin().topics().getStats(service.getWorkerConfig().getClusterCoordinationTopic());
            String activeConsumerName = stats != null && stats.subscriptions.get(COORDINATION_TOPIC_SUBSCRIPTION) != null ? ((SubscriptionStats)stats.subscriptions.get((Object)COORDINATION_TOPIC_SUBSCRIPTION)).activeConsumerName : null;
            return consumerName != null && consumerName.equalsIgnoreCase(activeConsumerName);
        }
        catch (Exception e) {
            log.warn("Failed to check leader {}", (Object)e.getMessage());
            return false;
        }
    }
}

