/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.functions.worker;

import com.google.common.annotations.VisibleForTesting;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.Request;
import org.apache.pulsar.functions.utils.FunctionMetaDataUtils;
import org.apache.pulsar.functions.worker.ErrorNotifier;
import org.apache.pulsar.functions.worker.FunctionMetaDataTopicTailer;
import org.apache.pulsar.functions.worker.SchedulerManager;
import org.apache.pulsar.functions.worker.WorkerConfig;
import org.apache.pulsar.functions.worker.request.RequestResult;
import org.apache.pulsar.functions.worker.request.ServiceRequestInfo;
import org.apache.pulsar.functions.worker.request.ServiceRequestManager;
import org.apache.pulsar.functions.worker.request.ServiceRequestUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FunctionMetaDataManager
implements AutoCloseable {
    private static final Logger log = LoggerFactory.getLogger(FunctionMetaDataManager.class);
    @VisibleForTesting
    final Map<String, Map<String, Map<String, Function.FunctionMetaData>>> functionMetaDataMap = new ConcurrentHashMap<String, Map<String, Map<String, Function.FunctionMetaData>>>();
    private final Map<String, ServiceRequestInfo> pendingServiceRequests = new ConcurrentHashMap<String, ServiceRequestInfo>();
    private final ServiceRequestManager serviceRequestManager;
    private final SchedulerManager schedulerManager;
    private final WorkerConfig workerConfig;
    private final PulsarClient pulsarClient;
    private final ErrorNotifier errorNotifier;
    private FunctionMetaDataTopicTailer functionMetaDataTopicTailer;
    boolean isInitializePhase = false;

    public FunctionMetaDataManager(WorkerConfig workerConfig, SchedulerManager schedulerManager, PulsarClient pulsarClient, ErrorNotifier errorNotifier) throws PulsarClientException {
        this.workerConfig = workerConfig;
        this.pulsarClient = pulsarClient;
        this.serviceRequestManager = this.getServiceRequestManager(this.pulsarClient, this.workerConfig.getFunctionMetadataTopic());
        this.schedulerManager = schedulerManager;
        this.errorNotifier = errorNotifier;
    }

    public void initialize() {
        log.info("/** Initializing Function Metadata Manager **/");
        try {
            this.functionMetaDataTopicTailer = new FunctionMetaDataTopicTailer(this, this.pulsarClient.newReader(), this.workerConfig, this.errorNotifier);
            this.setInitializePhase(true);
            while (this.functionMetaDataTopicTailer.getReader().hasMessageAvailable()) {
                this.functionMetaDataTopicTailer.processRequest((Message<byte[]>)this.functionMetaDataTopicTailer.getReader().readNext());
            }
            this.setInitializePhase(false);
            this.schedulerManager.schedule();
            this.functionMetaDataTopicTailer.start();
        }
        catch (Exception e) {
            log.error("Failed to initialize meta data store", (Throwable)e);
            throw new RuntimeException(e);
        }
    }

    public synchronized Function.FunctionMetaData getFunctionMetaData(String tenant, String namespace, String functionName) {
        return this.functionMetaDataMap.get(tenant).get(namespace).get(functionName);
    }

    public synchronized List<Function.FunctionMetaData> getAllFunctionMetaData() {
        LinkedList<Function.FunctionMetaData> ret = new LinkedList<Function.FunctionMetaData>();
        for (Map<String, Map<String, Function.FunctionMetaData>> i : this.functionMetaDataMap.values()) {
            for (Map<String, Function.FunctionMetaData> j : i.values()) {
                ret.addAll(j.values());
            }
        }
        return ret;
    }

    public synchronized Collection<Function.FunctionMetaData> listFunctions(String tenant, String namespace) {
        LinkedList<Function.FunctionMetaData> ret = new LinkedList<Function.FunctionMetaData>();
        if (!this.functionMetaDataMap.containsKey(tenant)) {
            return ret;
        }
        if (!this.functionMetaDataMap.get(tenant).containsKey(namespace)) {
            return ret;
        }
        for (Function.FunctionMetaData functionMetaData : this.functionMetaDataMap.get(tenant).get(namespace).values()) {
            ret.add(functionMetaData);
        }
        return ret;
    }

    public synchronized boolean containsFunction(String tenant, String namespace, String functionName) {
        return this.containsFunctionMetaData(tenant, namespace, functionName);
    }

    public synchronized CompletableFuture<RequestResult> updateFunction(Function.FunctionMetaData functionMetaData) {
        Function.FunctionMetaData existingFunctionMetadata = null;
        if (this.containsFunction(functionMetaData.getFunctionDetails().getTenant(), functionMetaData.getFunctionDetails().getNamespace(), functionMetaData.getFunctionDetails().getName())) {
            existingFunctionMetadata = this.getFunctionMetaData(functionMetaData.getFunctionDetails().getTenant(), functionMetaData.getFunctionDetails().getNamespace(), functionMetaData.getFunctionDetails().getName());
        }
        Function.FunctionMetaData newFunctionMetaData = FunctionMetaDataUtils.generateUpdatedMetadata(existingFunctionMetadata, (Function.FunctionMetaData)functionMetaData);
        Request.ServiceRequest updateRequest = ServiceRequestUtils.getUpdateRequest(this.workerConfig.getWorkerId(), newFunctionMetaData);
        return this.submit(updateRequest);
    }

    public synchronized CompletableFuture<RequestResult> deregisterFunction(String tenant, String namespace, String functionName) {
        Function.FunctionMetaData functionMetaData = this.functionMetaDataMap.get(tenant).get(namespace).get(functionName);
        Function.FunctionMetaData newFunctionMetaData = FunctionMetaDataUtils.generateUpdatedMetadata((Function.FunctionMetaData)functionMetaData, (Function.FunctionMetaData)functionMetaData);
        Request.ServiceRequest deregisterRequest = ServiceRequestUtils.getDeregisterRequest(this.workerConfig.getWorkerId(), newFunctionMetaData);
        return this.submit(deregisterRequest);
    }

    public synchronized CompletableFuture<RequestResult> changeFunctionInstanceStatus(String tenant, String namespace, String functionName, Integer instanceId, boolean start) {
        Function.FunctionMetaData functionMetaData = this.functionMetaDataMap.get(tenant).get(namespace).get(functionName);
        Function.FunctionMetaData newFunctionMetaData = FunctionMetaDataUtils.changeFunctionInstanceStatus((Function.FunctionMetaData)functionMetaData, (Integer)instanceId, (boolean)start);
        Request.ServiceRequest updateRequest = ServiceRequestUtils.getUpdateRequest(this.workerConfig.getWorkerId(), newFunctionMetaData);
        return this.submit(updateRequest);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void processRequest(MessageId messageId, Request.ServiceRequest serviceRequest) {
        FunctionMetaDataManager functionMetaDataManager = this;
        synchronized (functionMetaDataManager) {
            switch (serviceRequest.getServiceRequestType()) {
                case UPDATE: {
                    this.processUpdate(serviceRequest);
                    break;
                }
                case DELETE: {
                    this.proccessDeregister(serviceRequest);
                    break;
                }
                default: {
                    log.warn("Received request with unrecognized type: {}", (Object)serviceRequest);
                }
            }
        }
    }

    private boolean containsFunctionMetaData(Function.FunctionMetaData functionMetaData) {
        return this.containsFunctionMetaData(functionMetaData.getFunctionDetails());
    }

    private boolean containsFunctionMetaData(Function.FunctionDetails functionDetails) {
        return this.containsFunctionMetaData(functionDetails.getTenant(), functionDetails.getNamespace(), functionDetails.getName());
    }

    private boolean containsFunctionMetaData(String tenant, String namespace, String functionName) {
        return this.functionMetaDataMap.containsKey(tenant) && this.functionMetaDataMap.get(tenant).containsKey(namespace) && this.functionMetaDataMap.get(tenant).get(namespace).containsKey(functionName);
    }

    @VisibleForTesting
    synchronized void proccessDeregister(Request.ServiceRequest deregisterRequest) {
        Function.FunctionMetaData deregisterRequestFs = deregisterRequest.getFunctionMetaData();
        String functionName = deregisterRequestFs.getFunctionDetails().getName();
        String tenant = deregisterRequestFs.getFunctionDetails().getTenant();
        String namespace = deregisterRequestFs.getFunctionDetails().getNamespace();
        boolean needsScheduling = false;
        log.debug("Process deregister request: {}", (Object)deregisterRequest);
        if (this.containsFunctionMetaData(deregisterRequestFs)) {
            if (!this.isRequestOutdated(deregisterRequest)) {
                this.functionMetaDataMap.get(tenant).get(namespace).remove(functionName);
                this.completeRequest(deregisterRequest, true);
                needsScheduling = true;
            } else {
                if (log.isDebugEnabled()) {
                    log.debug("{}/{}/{} Ignoring outdated request version: {}", new Object[]{tenant, namespace, functionName, deregisterRequest.getFunctionMetaData().getVersion()});
                }
                this.completeRequest(deregisterRequest, false, "Request ignored because it is out of date. Please try again.");
            }
        } else {
            this.completeRequest(deregisterRequest, true);
        }
        if (!this.isInitializePhase() && needsScheduling) {
            this.schedulerManager.schedule();
        }
    }

    @VisibleForTesting
    synchronized void processUpdate(Request.ServiceRequest updateRequest) {
        log.debug("Process update request: {}", (Object)updateRequest);
        Function.FunctionMetaData updateRequestFs = updateRequest.getFunctionMetaData();
        boolean needsScheduling = false;
        if (!this.containsFunctionMetaData(updateRequestFs)) {
            this.setFunctionMetaData(updateRequestFs);
            needsScheduling = true;
            this.completeRequest(updateRequest, true);
        } else if (!this.isRequestOutdated(updateRequest)) {
            this.setFunctionMetaData(updateRequestFs);
            needsScheduling = true;
            this.completeRequest(updateRequest, true);
        } else {
            this.completeRequest(updateRequest, false, "Request ignored because it is out of date. Please try again.");
        }
        if (!this.isInitializePhase() && needsScheduling) {
            this.schedulerManager.schedule();
        }
    }

    private void completeRequest(Request.ServiceRequest serviceRequest, boolean isSuccess, String message) {
        ServiceRequestInfo pendingServiceRequestInfo = this.pendingServiceRequests.getOrDefault(serviceRequest.getRequestId(), null);
        if (pendingServiceRequestInfo != null) {
            RequestResult requestResult = new RequestResult();
            requestResult.setSuccess(isSuccess);
            requestResult.setMessage(message);
            pendingServiceRequestInfo.getRequestResultCompletableFuture().complete(requestResult);
        }
    }

    private void completeRequest(Request.ServiceRequest serviceRequest, boolean isSuccess) {
        this.completeRequest(serviceRequest, isSuccess, null);
    }

    private boolean isRequestOutdated(Request.ServiceRequest serviceRequest) {
        Function.FunctionMetaData requestFunctionMetaData = serviceRequest.getFunctionMetaData();
        Function.FunctionDetails functionDetails = requestFunctionMetaData.getFunctionDetails();
        Function.FunctionMetaData currentFunctionMetaData = this.functionMetaDataMap.get(functionDetails.getTenant()).get(functionDetails.getNamespace()).get(functionDetails.getName());
        return currentFunctionMetaData.getVersion() >= requestFunctionMetaData.getVersion();
    }

    @VisibleForTesting
    void setFunctionMetaData(Function.FunctionMetaData functionMetaData) {
        Function.FunctionDetails functionDetails = functionMetaData.getFunctionDetails();
        if (!this.functionMetaDataMap.containsKey(functionDetails.getTenant())) {
            this.functionMetaDataMap.put(functionDetails.getTenant(), new ConcurrentHashMap());
        }
        if (!this.functionMetaDataMap.get(functionDetails.getTenant()).containsKey(functionDetails.getNamespace())) {
            this.functionMetaDataMap.get(functionDetails.getTenant()).put(functionDetails.getNamespace(), new ConcurrentHashMap());
        }
        this.functionMetaDataMap.get(functionDetails.getTenant()).get(functionDetails.getNamespace()).put(functionDetails.getName(), functionMetaData);
    }

    @VisibleForTesting
    CompletableFuture<RequestResult> submit(Request.ServiceRequest serviceRequest) {
        ServiceRequestInfo serviceRequestInfo = ServiceRequestInfo.of(serviceRequest);
        CompletableFuture<MessageId> messageIdCompletableFuture = this.serviceRequestManager.submitRequest(serviceRequest);
        serviceRequestInfo.setCompletableFutureRequestMessageId(messageIdCompletableFuture);
        CompletableFuture<RequestResult> requestResultCompletableFuture = new CompletableFuture<RequestResult>();
        serviceRequestInfo.setRequestResultCompletableFuture(requestResultCompletableFuture);
        this.pendingServiceRequests.put(serviceRequestInfo.getServiceRequest().getRequestId(), serviceRequestInfo);
        messageIdCompletableFuture.exceptionally(ex -> {
            Function.FunctionDetails metadata = serviceRequest.getFunctionMetaData().getFunctionDetails();
            log.warn("Failed to submit function metadata for {}/{}/{}-{}", new Object[]{metadata.getTenant(), metadata.getNamespace(), metadata.getName(), ex.getMessage()});
            serviceRequestInfo.getRequestResultCompletableFuture().completeExceptionally(new RuntimeException("Failed to submit function metadata"));
            return null;
        });
        return requestResultCompletableFuture;
    }

    @Override
    public void close() throws Exception {
        if (this.functionMetaDataTopicTailer != null) {
            this.functionMetaDataTopicTailer.close();
        }
        if (this.serviceRequestManager != null) {
            this.serviceRequestManager.close();
        }
    }

    private ServiceRequestManager getServiceRequestManager(PulsarClient pulsarClient, String functionMetadataTopic) throws PulsarClientException {
        return new ServiceRequestManager(pulsarClient.newProducer().topic(functionMetadataTopic).producerName(this.workerConfig.getWorkerId() + "-function-metadata-manager").create());
    }

    public void setInitializePhase(boolean isInitializePhase) {
        this.isInitializePhase = isInitializePhase;
    }

    public boolean isInitializePhase() {
        return this.isInitializePhase;
    }
}

