package org.apache.pulsar.functions.worker.rest.api;

import com.google.common.base.Preconditions;
import com.google.gson.Gson;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.function.Supplier;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.Response;
import org.apache.pulsar.common.policies.data.ErrorData;
import org.apache.pulsar.common.stats.Metrics;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.InstanceCommunication;
import org.apache.pulsar.functions.runtime.Runtime;
import org.apache.pulsar.functions.runtime.RuntimeSpawner;
import org.apache.pulsar.functions.utils.Utils;
import org.apache.pulsar.functions.worker.FunctionRuntimeInfo;
import org.apache.pulsar.functions.worker.WorkerInfo;
import org.apache.pulsar.functions.worker.WorkerService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/functions/worker/rest/api/WorkerImpl.class */
public class WorkerImpl {
    private static final Logger log = LoggerFactory.getLogger(WorkerImpl.class);
    private final Supplier<WorkerService> workerServiceSupplier;

    public WorkerImpl(Supplier<WorkerService> supplier) {
        this.workerServiceSupplier = supplier;
    }

    private WorkerService worker() {
        try {
            return (WorkerService) Preconditions.checkNotNull(this.workerServiceSupplier.get());
        } catch (Throwable th) {
            log.info("Failed to get worker service", th);
            throw th;
        }
    }

    private boolean isWorkerServiceAvailable() {
        WorkerService workerService = this.workerServiceSupplier.get();
        return workerService != null && workerService.isInitialized();
    }

    public Response getCluster() {
        if (!isWorkerServiceAvailable()) {
            return getUnavailableResponse();
        }
        return Response.status(Response.Status.OK).type("application/json").entity(new Gson().toJson(worker().getMembershipManager().getCurrentMembership())).build();
    }

    public Response getClusterLeader() {
        if (!isWorkerServiceAvailable()) {
            return getUnavailableResponse();
        }
        WorkerInfo leader = worker().getMembershipManager().getLeader();
        if (leader == null) {
            return Response.status(Response.Status.INTERNAL_SERVER_ERROR).type("application/json").entity(new ErrorData("Leader cannot be determined")).build();
        }
        return Response.status(Response.Status.OK).type("application/json").entity(new Gson().toJson(leader)).build();
    }

    public Response getAssignments() {
        if (!isWorkerServiceAvailable()) {
            return getUnavailableResponse();
        }
        Map<String, Map<String, Function.Assignment>> currentAssignments = worker().getFunctionRuntimeManager().getCurrentAssignments();
        HashMap hashMap = new HashMap();
        for (Map.Entry<String, Map<String, Function.Assignment>> entry : currentAssignments.entrySet()) {
            hashMap.put(entry.getKey(), entry.getValue().keySet());
        }
        return Response.status(Response.Status.OK).type("application/json").entity(new Gson().toJson(hashMap)).build();
    }

    private Response getUnavailableResponse() {
        return Response.status(Response.Status.SERVICE_UNAVAILABLE).type("application/json").entity(new ErrorData("Function worker service is not done initializing. Please try again in a little while.")).build();
    }

    public boolean isSuperUser(String str) {
        return str != null && worker().getWorkerConfig().getSuperUserRoles().contains(str);
    }

    public List<Metrics> getWorkerMetrcis(String str) throws IOException {
        if (!worker().getWorkerConfig().isAuthorizationEnabled() || isSuperUser(str)) {
            return getWorkerMetrcis();
        }
        log.error("Client [{}] is not admin and authorized to get function-stats", str);
        throw new WebApplicationException(Response.status(Response.Status.UNAUTHORIZED).type("application/json").entity(new ErrorData(str + " is not authorize to get metrics")).build());
    }

    private List<Metrics> getWorkerMetrcis() {
        if (isWorkerServiceAvailable()) {
            return worker().getMetricsGenerator().generate();
        }
        throw new WebApplicationException(Response.status(Response.Status.SERVICE_UNAVAILABLE).type("application/json").entity(new ErrorData("Function worker service is not avaialable")).build());
    }

    public Response getFunctionsMetrics(String str) throws IOException {
        if (!worker().getWorkerConfig().isAuthorizationEnabled() || isSuperUser(str)) {
            return getFunctionsMetrics();
        }
        log.error("Client [{}] is not admin and authorized to get function-stats", str);
        return Response.status(Response.Status.UNAUTHORIZED).type("application/json").entity(new ErrorData("client is not authorize to perform operation")).build();
    }

    private Response getFunctionsMetrics() throws IOException {
        Runtime runtime;
        if (!isWorkerServiceAvailable()) {
            return getUnavailableResponse();
        }
        WorkerService worker = worker();
        Map<String, FunctionRuntimeInfo> functionRuntimeInfos = worker.getFunctionRuntimeManager().getFunctionRuntimeInfos();
        InstanceCommunication.Metrics.Builder newBuilder = InstanceCommunication.Metrics.newBuilder();
        for (Map.Entry<String, FunctionRuntimeInfo> entry : functionRuntimeInfos.entrySet()) {
            String key = entry.getKey();
            FunctionRuntimeInfo value = entry.getValue();
            RuntimeSpawner runtimeSpawner = value.getRuntimeSpawner();
            if (runtimeSpawner != null && (runtime = runtimeSpawner.getRuntime()) != null) {
                try {
                    InstanceCommunication.MetricsData metricsData = worker.getWorkerConfig().getMetricsSamplingPeriodSec() > 0 ? (InstanceCommunication.MetricsData) runtime.getMetrics().get() : (InstanceCommunication.MetricsData) runtime.getAndResetMetrics().get();
                    String tenant = value.getFunctionInstance().getFunctionMetaData().getFunctionDetails().getTenant();
                    String namespace = value.getFunctionInstance().getFunctionMetaData().getFunctionDetails().getNamespace();
                    String name = value.getFunctionInstance().getFunctionMetaData().getFunctionDetails().getName();
                    int instanceId = value.getFunctionInstance().getInstanceId();
                    String format = String.format("%s/%s/%s", tenant, namespace, name);
                    InstanceCommunication.Metrics.InstanceMetrics.Builder newBuilder2 = InstanceCommunication.Metrics.InstanceMetrics.newBuilder();
                    newBuilder2.setName(format);
                    newBuilder2.setInstanceId(instanceId);
                    if (metricsData != null) {
                        newBuilder2.setMetricsData(metricsData);
                    }
                    newBuilder.addMetrics(newBuilder2.build());
                } catch (InterruptedException | ExecutionException e) {
                    log.warn("Failed to collect metrics for function instance {}", key, e);
                }
            }
        }
        return Response.status(Response.Status.OK).entity(Utils.printJson(newBuilder)).build();
    }
}
