package org.apache.pulsar.functions.worker;

import com.google.common.annotations.VisibleForTesting;
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.Request;
import org.apache.pulsar.functions.utils.FunctionMetaDataUtils;
import org.apache.pulsar.functions.worker.request.RequestResult;
import org.apache.pulsar.functions.worker.request.ServiceRequestInfo;
import org.apache.pulsar.functions.worker.request.ServiceRequestManager;
import org.apache.pulsar.functions.worker.request.ServiceRequestUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/functions/worker/FunctionMetaDataManager.class */
public class FunctionMetaDataManager implements AutoCloseable {
    private static final Logger log = LoggerFactory.getLogger(FunctionMetaDataManager.class);
    private final ServiceRequestManager serviceRequestManager;
    private final SchedulerManager schedulerManager;
    private final WorkerConfig workerConfig;
    private final PulsarClient pulsarClient;
    private final ErrorNotifier errorNotifier;
    private FunctionMetaDataTopicTailer functionMetaDataTopicTailer;

    @VisibleForTesting
    final Map<String, Map<String, Map<String, Function.FunctionMetaData>>> functionMetaDataMap = new ConcurrentHashMap();
    private final Map<String, ServiceRequestInfo> pendingServiceRequests = new ConcurrentHashMap();
    boolean isInitializePhase = false;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.pulsar.functions.worker.FunctionMetaDataManager$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/pulsar/functions/worker/FunctionMetaDataManager$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$pulsar$functions$proto$Request$ServiceRequest$ServiceRequestType = new int[Request.ServiceRequest.ServiceRequestType.values().length];

        static {
            try {
                $SwitchMap$org$apache$pulsar$functions$proto$Request$ServiceRequest$ServiceRequestType[Request.ServiceRequest.ServiceRequestType.UPDATE.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$pulsar$functions$proto$Request$ServiceRequest$ServiceRequestType[Request.ServiceRequest.ServiceRequestType.DELETE.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
        }
    }

    public FunctionMetaDataManager(WorkerConfig workerConfig, SchedulerManager schedulerManager, PulsarClient pulsarClient, ErrorNotifier errorNotifier) throws PulsarClientException {
        this.workerConfig = workerConfig;
        this.pulsarClient = pulsarClient;
        this.serviceRequestManager = getServiceRequestManager(this.pulsarClient, this.workerConfig.getFunctionMetadataTopic());
        this.schedulerManager = schedulerManager;
        this.errorNotifier = errorNotifier;
    }

    public void initialize() {
        log.info("/** Initializing Function Metadata Manager **/");
        try {
            this.functionMetaDataTopicTailer = new FunctionMetaDataTopicTailer(this, this.pulsarClient.newReader(), this.workerConfig, this.errorNotifier);
            setInitializePhase(true);
            while (this.functionMetaDataTopicTailer.getReader().hasMessageAvailable()) {
                this.functionMetaDataTopicTailer.processRequest(this.functionMetaDataTopicTailer.getReader().readNext());
            }
            setInitializePhase(false);
            this.schedulerManager.schedule();
            this.functionMetaDataTopicTailer.start();
        } catch (Exception e) {
            log.error("Failed to initialize meta data store", e);
            throw new RuntimeException(e);
        }
    }

    public synchronized Function.FunctionMetaData getFunctionMetaData(String str, String str2, String str3) {
        return this.functionMetaDataMap.get(str).get(str2).get(str3);
    }

    public synchronized List<Function.FunctionMetaData> getAllFunctionMetaData() {
        LinkedList linkedList = new LinkedList();
        Iterator<Map<String, Map<String, Function.FunctionMetaData>>> it = this.functionMetaDataMap.values().iterator();
        while (it.hasNext()) {
            Iterator<Map<String, Function.FunctionMetaData>> it2 = it.next().values().iterator();
            while (it2.hasNext()) {
                linkedList.addAll(it2.next().values());
            }
        }
        return linkedList;
    }

    public synchronized Collection<Function.FunctionMetaData> listFunctions(String str, String str2) {
        LinkedList linkedList = new LinkedList();
        if (this.functionMetaDataMap.containsKey(str) && this.functionMetaDataMap.get(str).containsKey(str2)) {
            Iterator<Function.FunctionMetaData> it = this.functionMetaDataMap.get(str).get(str2).values().iterator();
            while (it.hasNext()) {
                linkedList.add(it.next());
            }
            return linkedList;
        }
        return linkedList;
    }

    public synchronized boolean containsFunction(String str, String str2, String str3) {
        return containsFunctionMetaData(str, str2, str3);
    }

    public synchronized CompletableFuture<RequestResult> updateFunction(Function.FunctionMetaData functionMetaData) {
        Function.FunctionMetaData functionMetaData2 = null;
        if (containsFunction(functionMetaData.getFunctionDetails().getTenant(), functionMetaData.getFunctionDetails().getNamespace(), functionMetaData.getFunctionDetails().getName())) {
            functionMetaData2 = getFunctionMetaData(functionMetaData.getFunctionDetails().getTenant(), functionMetaData.getFunctionDetails().getNamespace(), functionMetaData.getFunctionDetails().getName());
        }
        return submit(ServiceRequestUtils.getUpdateRequest(this.workerConfig.getWorkerId(), FunctionMetaDataUtils.generateUpdatedMetadata(functionMetaData2, functionMetaData)));
    }

    public synchronized CompletableFuture<RequestResult> deregisterFunction(String str, String str2, String str3) {
        Function.FunctionMetaData functionMetaData = this.functionMetaDataMap.get(str).get(str2).get(str3);
        return submit(ServiceRequestUtils.getDeregisterRequest(this.workerConfig.getWorkerId(), FunctionMetaDataUtils.generateUpdatedMetadata(functionMetaData, functionMetaData)));
    }

    public synchronized CompletableFuture<RequestResult> changeFunctionInstanceStatus(String str, String str2, String str3, Integer num, boolean z) {
        return submit(ServiceRequestUtils.getUpdateRequest(this.workerConfig.getWorkerId(), FunctionMetaDataUtils.changeFunctionInstanceStatus(this.functionMetaDataMap.get(str).get(str2).get(str3), num, z)));
    }

    public void processRequest(MessageId messageId, Request.ServiceRequest serviceRequest) {
        synchronized (this) {
            switch (AnonymousClass1.$SwitchMap$org$apache$pulsar$functions$proto$Request$ServiceRequest$ServiceRequestType[serviceRequest.getServiceRequestType().ordinal()]) {
                case 1:
                    processUpdate(serviceRequest);
                    break;
                case 2:
                    proccessDeregister(serviceRequest);
                    break;
                default:
                    log.warn("Received request with unrecognized type: {}", serviceRequest);
                    break;
            }
        }
    }

    private boolean containsFunctionMetaData(Function.FunctionMetaData functionMetaData) {
        return containsFunctionMetaData(functionMetaData.getFunctionDetails());
    }

    private boolean containsFunctionMetaData(Function.FunctionDetails functionDetails) {
        return containsFunctionMetaData(functionDetails.getTenant(), functionDetails.getNamespace(), functionDetails.getName());
    }

    private boolean containsFunctionMetaData(String str, String str2, String str3) {
        return this.functionMetaDataMap.containsKey(str) && this.functionMetaDataMap.get(str).containsKey(str2) && this.functionMetaDataMap.get(str).get(str2).containsKey(str3);
    }

    @VisibleForTesting
    synchronized void proccessDeregister(Request.ServiceRequest serviceRequest) {
        Function.FunctionMetaData functionMetaData = serviceRequest.getFunctionMetaData();
        String name = functionMetaData.getFunctionDetails().getName();
        String tenant = functionMetaData.getFunctionDetails().getTenant();
        String namespace = functionMetaData.getFunctionDetails().getNamespace();
        boolean z = false;
        log.debug("Process deregister request: {}", serviceRequest);
        if (!containsFunctionMetaData(functionMetaData)) {
            completeRequest(serviceRequest, true);
        } else if (isRequestOutdated(serviceRequest)) {
            if (log.isDebugEnabled()) {
                log.debug("{}/{}/{} Ignoring outdated request version: {}", new Object[]{tenant, namespace, name, Long.valueOf(serviceRequest.getFunctionMetaData().getVersion())});
            }
            completeRequest(serviceRequest, false, "Request ignored because it is out of date. Please try again.");
        } else {
            this.functionMetaDataMap.get(tenant).get(namespace).remove(name);
            completeRequest(serviceRequest, true);
            z = true;
        }
        if (isInitializePhase() || !z) {
            return;
        }
        this.schedulerManager.schedule();
    }

    @VisibleForTesting
    synchronized void processUpdate(Request.ServiceRequest serviceRequest) {
        log.debug("Process update request: {}", serviceRequest);
        Function.FunctionMetaData functionMetaData = serviceRequest.getFunctionMetaData();
        boolean z = false;
        if (!containsFunctionMetaData(functionMetaData)) {
            setFunctionMetaData(functionMetaData);
            z = true;
            completeRequest(serviceRequest, true);
        } else if (isRequestOutdated(serviceRequest)) {
            completeRequest(serviceRequest, false, "Request ignored because it is out of date. Please try again.");
        } else {
            setFunctionMetaData(functionMetaData);
            z = true;
            completeRequest(serviceRequest, true);
        }
        if (isInitializePhase() || !z) {
            return;
        }
        this.schedulerManager.schedule();
    }

    private void completeRequest(Request.ServiceRequest serviceRequest, boolean z, String str) {
        ServiceRequestInfo orDefault = this.pendingServiceRequests.getOrDefault(serviceRequest.getRequestId(), null);
        if (orDefault != null) {
            RequestResult requestResult = new RequestResult();
            requestResult.setSuccess(z);
            requestResult.setMessage(str);
            orDefault.getRequestResultCompletableFuture().complete(requestResult);
        }
    }

    private void completeRequest(Request.ServiceRequest serviceRequest, boolean z) {
        completeRequest(serviceRequest, z, null);
    }

    private boolean isRequestOutdated(Request.ServiceRequest serviceRequest) {
        Function.FunctionMetaData functionMetaData = serviceRequest.getFunctionMetaData();
        Function.FunctionDetails functionDetails = functionMetaData.getFunctionDetails();
        return this.functionMetaDataMap.get(functionDetails.getTenant()).get(functionDetails.getNamespace()).get(functionDetails.getName()).getVersion() >= functionMetaData.getVersion();
    }

    @VisibleForTesting
    void setFunctionMetaData(Function.FunctionMetaData functionMetaData) {
        Function.FunctionDetails functionDetails = functionMetaData.getFunctionDetails();
        if (!this.functionMetaDataMap.containsKey(functionDetails.getTenant())) {
            this.functionMetaDataMap.put(functionDetails.getTenant(), new ConcurrentHashMap());
        }
        if (!this.functionMetaDataMap.get(functionDetails.getTenant()).containsKey(functionDetails.getNamespace())) {
            this.functionMetaDataMap.get(functionDetails.getTenant()).put(functionDetails.getNamespace(), new ConcurrentHashMap());
        }
        this.functionMetaDataMap.get(functionDetails.getTenant()).get(functionDetails.getNamespace()).put(functionDetails.getName(), functionMetaData);
    }

    @VisibleForTesting
    CompletableFuture<RequestResult> submit(Request.ServiceRequest serviceRequest) {
        ServiceRequestInfo of = ServiceRequestInfo.of(serviceRequest);
        CompletableFuture<MessageId> submitRequest = this.serviceRequestManager.submitRequest(serviceRequest);
        of.setCompletableFutureRequestMessageId(submitRequest);
        CompletableFuture<RequestResult> completableFuture = new CompletableFuture<>();
        of.setRequestResultCompletableFuture(completableFuture);
        this.pendingServiceRequests.put(of.getServiceRequest().getRequestId(), of);
        submitRequest.exceptionally(th -> {
            Function.FunctionDetails functionDetails = serviceRequest.getFunctionMetaData().getFunctionDetails();
            log.warn("Failed to submit function metadata for {}/{}/{}-{}", new Object[]{functionDetails.getTenant(), functionDetails.getNamespace(), functionDetails.getName(), th.getMessage()});
            of.getRequestResultCompletableFuture().completeExceptionally(new RuntimeException("Failed to submit function metadata"));
            return null;
        });
        return completableFuture;
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        if (this.functionMetaDataTopicTailer != null) {
            this.functionMetaDataTopicTailer.close();
        }
        if (this.serviceRequestManager != null) {
            this.serviceRequestManager.close();
        }
    }

    private ServiceRequestManager getServiceRequestManager(PulsarClient pulsarClient, String str) throws PulsarClientException {
        return new ServiceRequestManager(pulsarClient.newProducer().topic(str).producerName(this.workerConfig.getWorkerId() + "-function-metadata-manager").create());
    }

    public void setInitializePhase(boolean z) {
        this.isInitializePhase = z;
    }

    public boolean isInitializePhase() {
        return this.isInitializePhase;
    }
}
