/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.functions.worker.rest.api;

import com.google.common.base.Preconditions;
import com.google.gson.Gson;
import com.google.protobuf.AbstractMessage;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardCopyOption;
import java.util.Base64;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.StreamingOutput;
import javax.ws.rs.core.UriBuilder;
import net.jodah.typetools.TypeResolver;
import org.apache.bookkeeper.api.StorageClient;
import org.apache.bookkeeper.api.kv.Table;
import org.apache.bookkeeper.api.kv.result.KeyValue;
import org.apache.bookkeeper.clients.StorageClientBuilder;
import org.apache.bookkeeper.clients.config.StorageClientSettings;
import org.apache.bookkeeper.clients.exceptions.NamespaceNotFoundException;
import org.apache.bookkeeper.clients.exceptions.StreamNotFoundException;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SchemaSerializationException;
import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.common.functions.FunctionState;
import org.apache.pulsar.common.functions.WorkerInfo;
import org.apache.pulsar.common.io.ConnectorDefinition;
import org.apache.pulsar.common.io.SinkConfig;
import org.apache.pulsar.common.io.SourceConfig;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.FunctionStats;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.util.Codec;
import org.apache.pulsar.functions.api.Function;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.InstanceCommunication;
import org.apache.pulsar.functions.runtime.RuntimeSpawner;
import org.apache.pulsar.functions.sink.PulsarSink;
import org.apache.pulsar.functions.utils.FunctionConfigUtils;
import org.apache.pulsar.functions.utils.Reflections;
import org.apache.pulsar.functions.utils.SinkConfigUtils;
import org.apache.pulsar.functions.utils.SourceConfigUtils;
import org.apache.pulsar.functions.utils.StateUtils;
import org.apache.pulsar.functions.utils.Utils;
import org.apache.pulsar.functions.worker.FunctionMetaDataManager;
import org.apache.pulsar.functions.worker.FunctionRuntimeInfo;
import org.apache.pulsar.functions.worker.FunctionRuntimeManager;
import org.apache.pulsar.functions.worker.Utils;
import org.apache.pulsar.functions.worker.WorkerService;
import org.apache.pulsar.functions.worker.request.RequestResult;
import org.apache.pulsar.functions.worker.rest.RestException;
import org.apache.pulsar.io.core.Sink;
import org.apache.pulsar.io.core.Source;
import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class ComponentImpl {
    private static final Logger log = LoggerFactory.getLogger(ComponentImpl.class);
    private final AtomicReference<StorageClient> storageClient = new AtomicReference();
    protected final Supplier<WorkerService> workerServiceSupplier;
    protected final Utils.ComponentType componentType;

    public ComponentImpl(Supplier<WorkerService> workerServiceSupplier, Utils.ComponentType componentType) {
        this.workerServiceSupplier = workerServiceSupplier;
        this.componentType = componentType;
    }

    protected WorkerService worker() {
        try {
            return (WorkerService)Preconditions.checkNotNull((Object)this.workerServiceSupplier.get());
        }
        catch (Throwable t) {
            log.info("Failed to get worker service", t);
            throw t;
        }
    }

    boolean isWorkerServiceAvailable() {
        WorkerService workerService = this.workerServiceSupplier.get();
        if (workerService == null) {
            return false;
        }
        return workerService.isInitialized();
    }

    public void registerFunction(String tenant, String namespace, String componentName, InputStream uploadedInputStream, FormDataContentDisposition fileDetail, String functionPkgUrl, String functionDetailsJson, String componentConfigJson, String clientRole) {
        Function.PackageLocationMetaData.Builder packageLocationMetaDataBuilder;
        Function.FunctionDetails functionDetails;
        if (!this.isWorkerServiceAvailable()) {
            this.throwUnavailableException();
        }
        if (tenant == null) {
            throw new RestException(Response.Status.BAD_REQUEST, "Tenant is not provided");
        }
        if (namespace == null) {
            throw new RestException(Response.Status.BAD_REQUEST, "Namespace is not provided");
        }
        if (componentName == null) {
            throw new RestException(Response.Status.BAD_REQUEST, this.componentType + " Name is not provided");
        }
        try {
            TenantInfo tenantInfo = this.worker().getBrokerAdmin().tenants().getTenantInfo(tenant);
            String qualifiedNamespace = tenant + "/" + namespace;
            List namespaces = this.worker().getBrokerAdmin().namespaces().getNamespaces(tenant);
            if (namespaces != null && !namespaces.contains(qualifiedNamespace)) {
                String qualifiedNamespaceWithCluster = String.format("%s/%s/%s", tenant, this.worker().getWorkerConfig().getPulsarFunctionsCluster(), namespace);
                if (namespaces != null && !namespaces.contains(qualifiedNamespaceWithCluster)) {
                    log.error("{}/{}/{} Namespace {} does not exist", new Object[]{tenant, namespace, componentName, namespace});
                    throw new RestException(Response.Status.BAD_REQUEST, "Namespace does not exist");
                }
            }
        }
        catch (PulsarAdminException.NotAuthorizedException e) {
            log.error("{}/{}/{} Client [{}] is not admin and authorized to operate {} on tenant", new Object[]{tenant, namespace, componentName, clientRole, this.componentType});
            throw new RestException(Response.Status.UNAUTHORIZED, "client is not authorize to perform operation");
        }
        catch (PulsarAdminException.NotFoundException e) {
            log.error("{}/{}/{} Tenant {} does not exist", new Object[]{tenant, namespace, componentName, tenant});
            throw new RestException(Response.Status.BAD_REQUEST, "Tenant does not exist");
        }
        catch (PulsarAdminException e) {
            log.error("{}/{}/{} Issues getting tenant data", new Object[]{tenant, namespace, componentName, e});
            throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, e.getMessage());
        }
        try {
            if (!this.isAuthorizedRole(tenant, clientRole)) {
                log.error("{}/{}/{} Client [{}] is not admin and authorized to register {}", new Object[]{tenant, namespace, componentName, clientRole, this.componentType});
                throw new RestException(Response.Status.UNAUTHORIZED, "client is not authorize to perform operation");
            }
        }
        catch (PulsarAdminException e) {
            log.error("{}/{}/{} Failed to authorize [{}]", new Object[]{tenant, namespace, componentName, e});
            throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, e.getMessage());
        }
        FunctionMetaDataManager functionMetaDataManager = this.worker().getFunctionMetaDataManager();
        if (functionMetaDataManager.containsFunction(tenant, namespace, componentName)) {
            log.error("{} {}/{}/{} already exists", new Object[]{this.componentType, tenant, namespace, componentName});
            throw new RestException(Response.Status.BAD_REQUEST, String.format("%s %s already exists", this.componentType, componentName));
        }
        boolean isPkgUrlProvided = StringUtils.isNotBlank((CharSequence)functionPkgUrl);
        File uploadedInputStreamAsFile = null;
        if (uploadedInputStream != null) {
            uploadedInputStreamAsFile = ComponentImpl.dumpToTmpFile(uploadedInputStream);
        }
        try {
            functionDetails = isPkgUrlProvided ? this.validateUpdateRequestParamsWithPkgUrl(tenant, namespace, componentName, functionPkgUrl, functionDetailsJson, componentConfigJson, this.componentType) : this.validateUpdateRequestParams(tenant, namespace, componentName, uploadedInputStreamAsFile, fileDetail, functionDetailsJson, componentConfigJson, this.componentType);
        }
        catch (Exception e) {
            log.error("Invalid register {} request @ /{}/{}/{}", new Object[]{this.componentType, tenant, namespace, componentName, e});
            throw new RestException(Response.Status.BAD_REQUEST, e.getMessage());
        }
        try {
            this.worker().getFunctionRuntimeManager().getRuntimeFactory().doAdmissionChecks(functionDetails);
        }
        catch (Exception e) {
            log.error("{} {}/{}/{} cannot be admitted by the runtime factory", new Object[]{this.componentType, tenant, namespace, componentName});
            throw new RestException(Response.Status.BAD_REQUEST, String.format("%s %s cannot be admitted:- %s", this.componentType, componentName, e.getMessage()));
        }
        Function.FunctionMetaData.Builder functionMetaDataBuilder = Function.FunctionMetaData.newBuilder().setFunctionDetails(functionDetails).setCreateTime(System.currentTimeMillis()).setVersion(0L);
        try {
            packageLocationMetaDataBuilder = this.getFunctionPackageLocation(functionDetails, functionPkgUrl, fileDetail, uploadedInputStreamAsFile);
        }
        catch (Exception e) {
            throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, e.getMessage());
        }
        functionMetaDataBuilder.setPackageLocation(packageLocationMetaDataBuilder);
        this.updateRequest(functionMetaDataBuilder.build());
    }

    private Function.PackageLocationMetaData.Builder getFunctionPackageLocation(Function.FunctionDetails functionDetails, String functionPkgUrl, FormDataContentDisposition fileDetail, File uploadedInputStreamAsFile) throws Exception {
        String tenant = functionDetails.getTenant();
        String namespace = functionDetails.getNamespace();
        String componentName = functionDetails.getName();
        Function.PackageLocationMetaData.Builder packageLocationMetaDataBuilder = Function.PackageLocationMetaData.newBuilder();
        boolean isBuiltin = this.isFunctionCodeBuiltin(functionDetails);
        boolean isPkgUrlProvided = StringUtils.isNotBlank((CharSequence)functionPkgUrl);
        if (this.worker().getFunctionRuntimeManager().getRuntimeFactory().externallyManaged()) {
            if (isBuiltin) {
                File sinkOrSource;
                if (this.componentType.equals((Object)Utils.ComponentType.SOURCE)) {
                    String archiveName = functionDetails.getSource().getBuiltin();
                    sinkOrSource = this.worker().getConnectorsManager().getSourceArchive(archiveName).toFile();
                } else {
                    String archiveName = functionDetails.getSink().getBuiltin();
                    sinkOrSource = this.worker().getConnectorsManager().getSinkArchive(archiveName).toFile();
                }
                packageLocationMetaDataBuilder.setPackagePath(ComponentImpl.createPackagePath(tenant, namespace, componentName, sinkOrSource.getName()));
                packageLocationMetaDataBuilder.setOriginalFileName(sinkOrSource.getName());
                log.info("Uploading {} package to {}", (Object)this.componentType, (Object)packageLocationMetaDataBuilder.getPackagePath());
                Utils.uploadFileToBookkeeper(packageLocationMetaDataBuilder.getPackagePath(), sinkOrSource, this.worker().getDlogNamespace());
            } else if (isPkgUrlProvided) {
                File file = org.apache.pulsar.functions.utils.Utils.extractFileFromPkg((String)functionPkgUrl);
                packageLocationMetaDataBuilder.setPackagePath(ComponentImpl.createPackagePath(tenant, namespace, componentName, file.getName()));
                packageLocationMetaDataBuilder.setOriginalFileName(file.getName());
                log.info("Uploading {} package to {}", (Object)this.componentType, (Object)packageLocationMetaDataBuilder.getPackagePath());
                Utils.uploadFileToBookkeeper(packageLocationMetaDataBuilder.getPackagePath(), file, this.worker().getDlogNamespace());
            } else {
                packageLocationMetaDataBuilder.setPackagePath(ComponentImpl.createPackagePath(tenant, namespace, componentName, fileDetail.getFileName()));
                packageLocationMetaDataBuilder.setOriginalFileName(fileDetail.getFileName());
                log.info("Uploading {} package to {}", (Object)this.componentType, (Object)packageLocationMetaDataBuilder.getPackagePath());
                Utils.uploadFileToBookkeeper(packageLocationMetaDataBuilder.getPackagePath(), uploadedInputStreamAsFile, this.worker().getDlogNamespace());
            }
        } else if (isBuiltin) {
            packageLocationMetaDataBuilder.setPackagePath("builtin://" + this.getFunctionCodeBuiltin(functionDetails));
        } else if (isPkgUrlProvided) {
            packageLocationMetaDataBuilder.setPackagePath(functionPkgUrl);
        } else {
            packageLocationMetaDataBuilder.setPackagePath(ComponentImpl.createPackagePath(tenant, namespace, componentName, fileDetail.getFileName()));
            packageLocationMetaDataBuilder.setOriginalFileName(fileDetail.getFileName());
            log.info("Uploading {} package to {}", (Object)this.componentType, (Object)packageLocationMetaDataBuilder.getPackagePath());
            Utils.uploadFileToBookkeeper(packageLocationMetaDataBuilder.getPackagePath(), uploadedInputStreamAsFile, this.worker().getDlogNamespace());
        }
        return packageLocationMetaDataBuilder;
    }

    public void updateFunction(String tenant, String namespace, String componentName, InputStream uploadedInputStream, FormDataContentDisposition fileDetail, String functionPkgUrl, String functionDetailsJson, String componentConfigJson, String clientRole) {
        Function.PackageLocationMetaData.Builder packageLocationMetaDataBuilder;
        Function.FunctionDetails functionDetails;
        String mergedComponentConfigJson;
        FunctionConfig mergedConfig;
        String existingComponentConfigJson;
        if (!this.isWorkerServiceAvailable()) {
            this.throwUnavailableException();
        }
        if (tenant == null) {
            throw new RestException(Response.Status.BAD_REQUEST, "Tenant is not provided");
        }
        if (namespace == null) {
            throw new RestException(Response.Status.BAD_REQUEST, "Namespace is not provided");
        }
        if (componentName == null) {
            throw new RestException(Response.Status.BAD_REQUEST, this.componentType + " Name is not provided");
        }
        try {
            if (!this.isAuthorizedRole(tenant, clientRole)) {
                log.error("{}/{}/{} Client [{}] is not admin and authorized to update {}", new Object[]{tenant, namespace, componentName, clientRole, this.componentType});
                throw new RestException(Response.Status.UNAUTHORIZED, this.componentType + "client is not authorize to perform operation");
            }
        }
        catch (PulsarAdminException e) {
            log.error("{}/{}/{} Failed to authorize [{}]", new Object[]{tenant, namespace, componentName, e});
            throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, e.getMessage());
        }
        FunctionMetaDataManager functionMetaDataManager = this.worker().getFunctionMetaDataManager();
        if (!functionMetaDataManager.containsFunction(tenant, namespace, componentName)) {
            throw new RestException(Response.Status.BAD_REQUEST, String.format("%s %s doesn't exist", this.componentType, componentName));
        }
        Function.FunctionMetaData existingComponent = functionMetaDataManager.getFunctionMetaData(tenant, namespace, componentName);
        if (this.componentType.equals((Object)Utils.ComponentType.FUNCTION)) {
            FunctionConfig existingFunctionConfig = FunctionConfigUtils.convertFromDetails((Function.FunctionDetails)existingComponent.getFunctionDetails());
            existingComponentConfigJson = new Gson().toJson((Object)existingFunctionConfig);
            FunctionConfig functionConfig = (FunctionConfig)new Gson().fromJson(componentConfigJson, FunctionConfig.class);
            functionConfig.setTenant(tenant);
            functionConfig.setNamespace(namespace);
            functionConfig.setName(componentName);
            try {
                mergedConfig = FunctionConfigUtils.validateUpdate((FunctionConfig)existingFunctionConfig, (FunctionConfig)functionConfig);
                mergedComponentConfigJson = new Gson().toJson((Object)mergedConfig);
            }
            catch (Exception e) {
                throw new RestException(Response.Status.BAD_REQUEST, e.getMessage());
            }
        }
        if (this.componentType.equals((Object)Utils.ComponentType.SOURCE)) {
            SourceConfig existingSourceConfig = SourceConfigUtils.convertFromDetails((Function.FunctionDetails)existingComponent.getFunctionDetails());
            existingComponentConfigJson = new Gson().toJson((Object)existingSourceConfig);
            SourceConfig sourceConfig = (SourceConfig)new Gson().fromJson(componentConfigJson, SourceConfig.class);
            sourceConfig.setTenant(tenant);
            sourceConfig.setNamespace(namespace);
            sourceConfig.setName(componentName);
            try {
                mergedConfig = SourceConfigUtils.validateUpdate((SourceConfig)existingSourceConfig, (SourceConfig)sourceConfig);
                mergedComponentConfigJson = new Gson().toJson((Object)mergedConfig);
            }
            catch (Exception e) {
                throw new RestException(Response.Status.BAD_REQUEST, e.getMessage());
            }
        }
        SinkConfig existingSinkConfig = SinkConfigUtils.convertFromDetails((Function.FunctionDetails)existingComponent.getFunctionDetails());
        existingComponentConfigJson = new Gson().toJson((Object)existingSinkConfig);
        SinkConfig sinkConfig = (SinkConfig)new Gson().fromJson(componentConfigJson, SinkConfig.class);
        sinkConfig.setTenant(tenant);
        sinkConfig.setNamespace(namespace);
        sinkConfig.setName(componentName);
        try {
            mergedConfig = SinkConfigUtils.validateUpdate((SinkConfig)existingSinkConfig, (SinkConfig)sinkConfig);
            mergedComponentConfigJson = new Gson().toJson((Object)mergedConfig);
        }
        catch (Exception e) {
            throw new RestException(Response.Status.BAD_REQUEST, e.getMessage());
        }
        if (existingComponentConfigJson.equals(mergedComponentConfigJson) && StringUtils.isBlank((CharSequence)functionPkgUrl) && uploadedInputStream == null) {
            log.error("{}/{}/{} Update contains no changes", new Object[]{tenant, namespace, componentName});
            throw new RestException(Response.Status.BAD_REQUEST, "Update contains no change");
        }
        File uploadedInputStreamAsFile = null;
        if (uploadedInputStream != null) {
            uploadedInputStreamAsFile = ComponentImpl.dumpToTmpFile(uploadedInputStream);
        }
        try {
            functionDetails = StringUtils.isNotBlank((CharSequence)functionPkgUrl) ? this.validateUpdateRequestParamsWithPkgUrl(tenant, namespace, componentName, functionPkgUrl, functionDetailsJson, mergedComponentConfigJson, this.componentType) : (uploadedInputStream != null ? this.validateUpdateRequestParams(tenant, namespace, componentName, uploadedInputStreamAsFile, fileDetail, functionDetailsJson, mergedComponentConfigJson, this.componentType) : (existingComponent.getPackageLocation().getPackagePath().startsWith("builtin://") ? this.validateUpdateRequestParams(tenant, namespace, componentName, null, null, functionDetailsJson, mergedComponentConfigJson, this.componentType) : this.validateUpdateRequestParamsWithExistingMetadata(tenant, namespace, componentName, existingComponent.getPackageLocation(), mergedComponentConfigJson, this.componentType)));
        }
        catch (Exception e) {
            log.error("Invalid update {} request @ /{}/{}/{}", new Object[]{this.componentType, tenant, namespace, componentName, e});
            throw new RestException(Response.Status.BAD_REQUEST, e.getMessage());
        }
        try {
            this.worker().getFunctionRuntimeManager().getRuntimeFactory().doAdmissionChecks(functionDetails);
        }
        catch (Exception e) {
            log.error("Updated {} {}/{}/{} cannot be submitted to runtime factory", new Object[]{this.componentType, tenant, namespace, componentName});
            throw new RestException(Response.Status.BAD_REQUEST, String.format("%s %s cannot be admitted:- %s", this.componentType, componentName, e.getMessage()));
        }
        Function.FunctionMetaData.Builder functionMetaDataBuilder = Function.FunctionMetaData.newBuilder().setFunctionDetails(functionDetails).setCreateTime(System.currentTimeMillis()).setVersion(0L);
        if (StringUtils.isNotBlank((CharSequence)functionPkgUrl) || uploadedInputStreamAsFile != null) {
            try {
                packageLocationMetaDataBuilder = this.getFunctionPackageLocation(functionDetails, functionPkgUrl, fileDetail, uploadedInputStreamAsFile);
            }
            catch (Exception e) {
                throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, e.getMessage());
            }
        } else {
            packageLocationMetaDataBuilder = Function.PackageLocationMetaData.newBuilder().mergeFrom(existingComponent.getPackageLocation());
        }
        functionMetaDataBuilder.setPackageLocation(packageLocationMetaDataBuilder);
        this.updateRequest(functionMetaDataBuilder.build());
    }

    public void deregisterFunction(String tenant, String namespace, String componentName, String clientRole) {
        if (!this.isWorkerServiceAvailable()) {
            this.throwUnavailableException();
        }
        try {
            if (!this.isAuthorizedRole(tenant, clientRole)) {
                log.error("{}/{}/{} Client [{}] is not admin and authorized to deregister {}", new Object[]{tenant, namespace, componentName, clientRole, this.componentType});
                throw new RestException(Response.Status.UNAUTHORIZED, "client is not authorize to perform operation");
            }
        }
        catch (PulsarAdminException e) {
            log.error("{}/{}/{} Failed to authorize [{}]", new Object[]{tenant, namespace, componentName, e});
            throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, e.getMessage());
        }
        if (null != this.worker().getStateStoreAdminClient()) {
            String tableNs = StateUtils.getStateNamespace((String)tenant, (String)namespace);
            String tableName = componentName;
            try {
                FutureUtils.result((CompletableFuture)this.worker().getStateStoreAdminClient().deleteStream(tableNs, tableName));
            }
            catch (NamespaceNotFoundException | StreamNotFoundException throwable) {
            }
            catch (Exception e) {
                log.error("{}/{}/{} Failed to delete state table", (Throwable)e);
                throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, e.getMessage());
            }
        }
        try {
            this.validateDeregisterRequestParams(tenant, namespace, componentName, this.componentType);
        }
        catch (IllegalArgumentException e) {
            log.error("Invalid deregister {} request @ /{}/{}/{}", new Object[]{this.componentType, tenant, namespace, componentName, e});
            throw new RestException(Response.Status.BAD_REQUEST, e.getMessage());
        }
        FunctionMetaDataManager functionMetaDataManager = this.worker().getFunctionMetaDataManager();
        if (!functionMetaDataManager.containsFunction(tenant, namespace, componentName)) {
            log.error("{} to deregister does not exist @ /{}/{}/{}", new Object[]{this.componentType, tenant, namespace, componentName});
            throw new RestException(Response.Status.NOT_FOUND, String.format("%s %s doesn't exist", this.componentType, componentName));
        }
        Function.FunctionMetaData functionMetaData = functionMetaDataManager.getFunctionMetaData(tenant, namespace, componentName);
        if (!this.calculateSubjectType(functionMetaData).equals((Object)this.componentType)) {
            log.error("{}/{}/{} is not a {}", new Object[]{tenant, namespace, componentName, this.componentType});
            throw new RestException(Response.Status.NOT_FOUND, String.format("%s %s doesn't exist", this.componentType, componentName));
        }
        CompletableFuture<RequestResult> completableFuture = functionMetaDataManager.deregisterFunction(tenant, namespace, componentName);
        RequestResult requestResult = null;
        try {
            requestResult = completableFuture.get();
            if (!requestResult.isSuccess()) {
                throw new RestException(Response.Status.BAD_REQUEST, requestResult.getMessage());
            }
        }
        catch (ExecutionException e) {
            log.error("Execution Exception while deregistering {} @ /{}/{}/{}", new Object[]{this.componentType, tenant, namespace, componentName, e});
            throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, e.getCause().getMessage());
        }
        catch (InterruptedException e) {
            log.error("Interrupted Exception while deregistering {} @ /{}/{}/{}", new Object[]{this.componentType, tenant, namespace, componentName, e});
            throw new RestException(Response.Status.REQUEST_TIMEOUT, e.getMessage());
        }
    }

    public FunctionConfig getFunctionInfo(String tenant, String namespace, String componentName) {
        if (!this.isWorkerServiceAvailable()) {
            this.throwUnavailableException();
        }
        try {
            this.validateGetFunctionRequestParams(tenant, namespace, componentName, this.componentType);
        }
        catch (IllegalArgumentException e) {
            log.error("Invalid get {} request @ /{}/{}/{}", new Object[]{this.componentType, tenant, namespace, componentName, e});
            throw new RestException(Response.Status.BAD_REQUEST, e.getMessage());
        }
        FunctionMetaDataManager functionMetaDataManager = this.worker().getFunctionMetaDataManager();
        if (!functionMetaDataManager.containsFunction(tenant, namespace, componentName)) {
            log.error("{} does not exist @ /{}/{}/{}", new Object[]{this.componentType, tenant, namespace, componentName});
            throw new RestException(Response.Status.NOT_FOUND, String.format(this.componentType + " %s doesn't exist", componentName));
        }
        Function.FunctionMetaData functionMetaData = functionMetaDataManager.getFunctionMetaData(tenant, namespace, componentName);
        if (!this.calculateSubjectType(functionMetaData).equals((Object)this.componentType)) {
            log.error("{}/{}/{} is not a {}", new Object[]{tenant, namespace, componentName, this.componentType});
            throw new RestException(Response.Status.NOT_FOUND, String.format(this.componentType + " %s doesn't exist", componentName));
        }
        FunctionConfig config = FunctionConfigUtils.convertFromDetails((Function.FunctionDetails)functionMetaData.getFunctionDetails());
        return config;
    }

    public void stopFunctionInstance(String tenant, String namespace, String componentName, String instanceId, URI uri) {
        this.changeFunctionInstanceStatus(tenant, namespace, componentName, instanceId, false, uri);
    }

    public void startFunctionInstance(String tenant, String namespace, String componentName, String instanceId, URI uri) {
        this.changeFunctionInstanceStatus(tenant, namespace, componentName, instanceId, true, uri);
    }

    public void changeFunctionInstanceStatus(String tenant, String namespace, String componentName, String instanceId, boolean start, URI uri) {
        if (!this.isWorkerServiceAvailable()) {
            this.throwUnavailableException();
        }
        try {
            this.validateGetFunctionInstanceRequestParams(tenant, namespace, componentName, this.componentType, instanceId);
        }
        catch (IllegalArgumentException e) {
            log.error("Invalid start/stop {} request @ /{}/{}/{}", new Object[]{this.componentType, tenant, namespace, componentName, e});
            throw new RestException(Response.Status.BAD_REQUEST, e.getMessage());
        }
        FunctionMetaDataManager functionMetaDataManager = this.worker().getFunctionMetaDataManager();
        if (!functionMetaDataManager.containsFunction(tenant, namespace, componentName)) {
            log.error("{} does not exist @ /{}/{}/{}", new Object[]{this.componentType, tenant, namespace, componentName});
            throw new RestException(Response.Status.NOT_FOUND, String.format("%s %s doesn't exist", this.componentType, componentName));
        }
        Function.FunctionMetaData functionMetaData = functionMetaDataManager.getFunctionMetaData(tenant, namespace, componentName);
        if (!this.calculateSubjectType(functionMetaData).equals((Object)this.componentType)) {
            log.error("{}/{}/{} is not a {}", new Object[]{tenant, namespace, componentName, this.componentType});
            throw new RestException(Response.Status.NOT_FOUND, String.format("%s %s doesn't exist", this.componentType, componentName));
        }
        if (!functionMetaDataManager.canChangeState(functionMetaData, Integer.parseInt(instanceId), start ? Function.FunctionState.RUNNING : Function.FunctionState.STOPPED)) {
            log.error("Operation not permitted on {}/{}/{}", new Object[]{tenant, namespace, componentName});
            throw new RestException(Response.Status.BAD_REQUEST, String.format("Operation not permitted", new Object[0]));
        }
        try {
            functionMetaDataManager.changeFunctionInstanceStatus(tenant, namespace, componentName, Integer.parseInt(instanceId), start);
        }
        catch (WebApplicationException we) {
            throw we;
        }
        catch (Exception e) {
            log.error("Failed to start/stop {}: {}/{}/{}/{}", new Object[]{this.componentType, tenant, namespace, componentName, instanceId, e});
            throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, e.getMessage());
        }
    }

    public void restartFunctionInstance(String tenant, String namespace, String componentName, String instanceId, URI uri) {
        if (!this.isWorkerServiceAvailable()) {
            this.throwUnavailableException();
        }
        try {
            this.validateGetFunctionInstanceRequestParams(tenant, namespace, componentName, this.componentType, instanceId);
        }
        catch (IllegalArgumentException e) {
            log.error("Invalid restart {} request @ /{}/{}/{}", new Object[]{this.componentType, tenant, namespace, componentName, e});
            throw new RestException(Response.Status.BAD_REQUEST, e.getMessage());
        }
        FunctionMetaDataManager functionMetaDataManager = this.worker().getFunctionMetaDataManager();
        if (!functionMetaDataManager.containsFunction(tenant, namespace, componentName)) {
            log.error("{} does not exist @ /{}/{}/{}", new Object[]{this.componentType, tenant, namespace, componentName});
            throw new RestException(Response.Status.NOT_FOUND, String.format("%s %s doesn't exist", this.componentType, componentName));
        }
        Function.FunctionMetaData functionMetaData = functionMetaDataManager.getFunctionMetaData(tenant, namespace, componentName);
        if (!this.calculateSubjectType(functionMetaData).equals((Object)this.componentType)) {
            log.error("{}/{}/{} is not a {}", new Object[]{tenant, namespace, componentName, this.componentType});
            throw new RestException(Response.Status.NOT_FOUND, String.format("%s %s doesn't exist", this.componentType, componentName));
        }
        FunctionRuntimeManager functionRuntimeManager = this.worker().getFunctionRuntimeManager();
        try {
            functionRuntimeManager.restartFunctionInstance(tenant, namespace, componentName, Integer.parseInt(instanceId), uri);
        }
        catch (WebApplicationException we) {
            throw we;
        }
        catch (Exception e) {
            log.error("Failed to restart {}: {}/{}/{}/{}", new Object[]{this.componentType, tenant, namespace, componentName, instanceId, e});
            throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, e.getMessage());
        }
    }

    public void stopFunctionInstances(String tenant, String namespace, String componentName) {
        this.changeFunctionStatusAllInstances(tenant, namespace, componentName, false);
    }

    public void startFunctionInstances(String tenant, String namespace, String componentName) {
        this.changeFunctionStatusAllInstances(tenant, namespace, componentName, true);
    }

    public void changeFunctionStatusAllInstances(String tenant, String namespace, String componentName, boolean start) {
        if (!this.isWorkerServiceAvailable()) {
            this.throwUnavailableException();
        }
        try {
            this.validateGetFunctionRequestParams(tenant, namespace, componentName, this.componentType);
        }
        catch (IllegalArgumentException e) {
            log.error("Invalid start/stop {} request @ /{}/{}/{}", new Object[]{this.componentType, tenant, namespace, componentName, e});
            throw new RestException(Response.Status.BAD_REQUEST, e.getMessage());
        }
        FunctionMetaDataManager functionMetaDataManager = this.worker().getFunctionMetaDataManager();
        if (!functionMetaDataManager.containsFunction(tenant, namespace, componentName)) {
            log.error("{} in stopFunctionInstances does not exist @ /{}/{}/{}", new Object[]{this.componentType, tenant, namespace, componentName});
            throw new RestException(Response.Status.NOT_FOUND, String.format("%s %s doesn't exist", this.componentType, componentName));
        }
        Function.FunctionMetaData functionMetaData = functionMetaDataManager.getFunctionMetaData(tenant, namespace, componentName);
        if (!this.calculateSubjectType(functionMetaData).equals((Object)this.componentType)) {
            log.error("{}/{}/{} is not a {}", new Object[]{tenant, namespace, componentName, this.componentType});
            throw new RestException(Response.Status.NOT_FOUND, String.format("%s %s doesn't exist", this.componentType, componentName));
        }
        if (!functionMetaDataManager.canChangeState(functionMetaData, -1, start ? Function.FunctionState.RUNNING : Function.FunctionState.STOPPED)) {
            log.error("Operation not permitted on {}/{}/{}", new Object[]{tenant, namespace, componentName});
            throw new RestException(Response.Status.BAD_REQUEST, String.format("Operation not permitted", new Object[0]));
        }
        try {
            functionMetaDataManager.changeFunctionInstanceStatus(tenant, namespace, componentName, -1, start);
        }
        catch (WebApplicationException we) {
            throw we;
        }
        catch (Exception e) {
            log.error("Failed to start/stop {}: {}/{}/{}", new Object[]{this.componentType, tenant, namespace, componentName, e});
            throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, e.getMessage());
        }
    }

    public void restartFunctionInstances(String tenant, String namespace, String componentName) {
        if (!this.isWorkerServiceAvailable()) {
            this.throwUnavailableException();
        }
        try {
            this.validateGetFunctionRequestParams(tenant, namespace, componentName, this.componentType);
        }
        catch (IllegalArgumentException e) {
            log.error("Invalid restart {} request @ /{}/{}/{}", new Object[]{this.componentType, tenant, namespace, componentName, e});
            throw new RestException(Response.Status.BAD_REQUEST, e.getMessage());
        }
        FunctionMetaDataManager functionMetaDataManager = this.worker().getFunctionMetaDataManager();
        if (!functionMetaDataManager.containsFunction(tenant, namespace, componentName)) {
            log.error("{} in stopFunctionInstances does not exist @ /{}/{}/{}", new Object[]{this.componentType, tenant, namespace, componentName});
            throw new RestException(Response.Status.NOT_FOUND, String.format("%s %s doesn't exist", this.componentType, componentName));
        }
        Function.FunctionMetaData functionMetaData = functionMetaDataManager.getFunctionMetaData(tenant, namespace, componentName);
        if (!this.calculateSubjectType(functionMetaData).equals((Object)this.componentType)) {
            log.error("{}/{}/{} is not a {}", new Object[]{tenant, namespace, componentName, this.componentType});
            throw new RestException(Response.Status.NOT_FOUND, String.format("%s %s doesn't exist", this.componentType, componentName));
        }
        FunctionRuntimeManager functionRuntimeManager = this.worker().getFunctionRuntimeManager();
        try {
            functionRuntimeManager.restartFunctionInstances(tenant, namespace, componentName);
        }
        catch (WebApplicationException we) {
            throw we;
        }
        catch (Exception e) {
            log.error("Failed to restart {}: {}/{}/{}", new Object[]{this.componentType, tenant, namespace, componentName, e});
            throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, e.getMessage());
        }
    }

    public FunctionStats getFunctionStats(String tenant, String namespace, String componentName, URI uri) {
        FunctionStats functionStats;
        if (!this.isWorkerServiceAvailable()) {
            throw new RestException(Response.Status.SERVICE_UNAVAILABLE, "Function worker service is not done initializing. Please try again in a little while.");
        }
        try {
            this.validateGetFunctionRequestParams(tenant, namespace, componentName, this.componentType);
        }
        catch (IllegalArgumentException e) {
            log.error("Invalid get {} Stats request @ /{}/{}/{}", new Object[]{this.componentType, tenant, namespace, componentName, e});
            throw new RestException(Response.Status.BAD_REQUEST, e.getMessage());
        }
        FunctionMetaDataManager functionMetaDataManager = this.worker().getFunctionMetaDataManager();
        if (!functionMetaDataManager.containsFunction(tenant, namespace, componentName)) {
            log.error("{} in get {} Stats does not exist @ /{}/{}/{}", new Object[]{this.componentType, this.componentType, tenant, namespace, componentName});
            throw new RestException(Response.Status.NOT_FOUND, String.format("%s %s doesn't exist", this.componentType, componentName));
        }
        Function.FunctionMetaData functionMetaData = functionMetaDataManager.getFunctionMetaData(tenant, namespace, componentName);
        if (!this.calculateSubjectType(functionMetaData).equals((Object)this.componentType)) {
            log.error("{}/{}/{} is not a {}", new Object[]{tenant, namespace, componentName, this.componentType});
            throw new RestException(Response.Status.NOT_FOUND, String.format("%s %s doesn't exist", this.componentType, componentName));
        }
        FunctionRuntimeManager functionRuntimeManager = this.worker().getFunctionRuntimeManager();
        try {
            functionStats = functionRuntimeManager.getFunctionStats(tenant, namespace, componentName, uri);
        }
        catch (WebApplicationException we) {
            throw we;
        }
        catch (Exception e) {
            log.error("{}/{}/{} Got Exception Getting Stats", new Object[]{tenant, namespace, componentName, e});
            throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, e.getMessage());
        }
        return functionStats;
    }

    public FunctionStats.FunctionInstanceStats.FunctionInstanceStatsData getFunctionsInstanceStats(String tenant, String namespace, String componentName, String instanceId, URI uri) {
        FunctionStats.FunctionInstanceStats.FunctionInstanceStatsData functionInstanceStatsData;
        if (!this.isWorkerServiceAvailable()) {
            throw new RestException(Response.Status.SERVICE_UNAVAILABLE, "Function worker service is not done initializing. Please try again in a little while.");
        }
        try {
            this.validateGetFunctionInstanceRequestParams(tenant, namespace, componentName, this.componentType, instanceId);
        }
        catch (IllegalArgumentException e) {
            log.error("Invalid get {} Stats request @ /{}/{}/{}", new Object[]{this.componentType, tenant, namespace, componentName, e});
            throw new RestException(Response.Status.BAD_REQUEST, e.getMessage());
        }
        FunctionMetaDataManager functionMetaDataManager = this.worker().getFunctionMetaDataManager();
        if (!functionMetaDataManager.containsFunction(tenant, namespace, componentName)) {
            log.error("{} in get {} Stats does not exist @ /{}/{}/{}", new Object[]{this.componentType, this.componentType, tenant, namespace, componentName});
            throw new RestException(Response.Status.NOT_FOUND, String.format("%s %s doesn't exist", this.componentType, componentName));
        }
        Function.FunctionMetaData functionMetaData = functionMetaDataManager.getFunctionMetaData(tenant, namespace, componentName);
        if (!this.calculateSubjectType(functionMetaData).equals((Object)this.componentType)) {
            log.error("{}/{}/{} is not a {}", new Object[]{tenant, namespace, componentName, this.componentType});
            throw new RestException(Response.Status.NOT_FOUND, String.format("%s %s doesn't exist", this.componentType, componentName));
        }
        int instanceIdInt = Integer.parseInt(instanceId);
        if (instanceIdInt < 0 || instanceIdInt >= functionMetaData.getFunctionDetails().getParallelism()) {
            log.error("instanceId in get {} Stats out of bounds @ /{}/{}/{}", new Object[]{this.componentType, tenant, namespace, componentName});
            throw new RestException(Response.Status.BAD_REQUEST, String.format("%s %s doesn't have instance with id %s", this.componentType, componentName, instanceId));
        }
        FunctionRuntimeManager functionRuntimeManager = this.worker().getFunctionRuntimeManager();
        try {
            functionInstanceStatsData = functionRuntimeManager.getFunctionInstanceStats(tenant, namespace, componentName, Integer.parseInt(instanceId), uri);
        }
        catch (WebApplicationException we) {
            throw we;
        }
        catch (Exception e) {
            log.error("{}/{}/{} Got Exception Getting Stats", new Object[]{tenant, namespace, componentName, e});
            throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, e.getMessage());
        }
        return functionInstanceStatsData;
    }

    public List<String> listFunctions(String tenant, String namespace) {
        if (!this.isWorkerServiceAvailable()) {
            this.throwUnavailableException();
        }
        try {
            this.validateListFunctionRequestParams(tenant, namespace);
        }
        catch (IllegalArgumentException e) {
            log.error("Invalid list {} request @ /{}/{}", new Object[]{this.componentType, tenant, namespace, e});
            throw new RestException(Response.Status.BAD_REQUEST, e.getMessage());
        }
        FunctionMetaDataManager functionMetaDataManager = this.worker().getFunctionMetaDataManager();
        Collection<Function.FunctionMetaData> functionStateList = functionMetaDataManager.listFunctions(tenant, namespace);
        LinkedList<String> retVals = new LinkedList<String>();
        for (Function.FunctionMetaData functionMetaData : functionStateList) {
            if (!this.calculateSubjectType(functionMetaData).equals((Object)this.componentType)) continue;
            retVals.add(functionMetaData.getFunctionDetails().getName());
        }
        return retVals;
    }

    private void updateRequest(Function.FunctionMetaData functionMetaData) {
        FunctionMetaDataManager functionMetaDataManager = this.worker().getFunctionMetaDataManager();
        CompletableFuture<RequestResult> completableFuture = functionMetaDataManager.updateFunction(functionMetaData);
        RequestResult requestResult = null;
        try {
            requestResult = completableFuture.get();
            if (!requestResult.isSuccess()) {
                throw new RestException(Response.Status.BAD_REQUEST, requestResult.getMessage());
            }
        }
        catch (ExecutionException e) {
            throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, e.getMessage());
        }
        catch (InterruptedException e) {
            throw new RestException(Response.Status.REQUEST_TIMEOUT, e.getMessage());
        }
    }

    public List<ConnectorDefinition> getListOfConnectors() {
        if (!this.isWorkerServiceAvailable()) {
            this.throwUnavailableException();
        }
        return this.worker().getConnectorsManager().getConnectors();
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    public String triggerFunction(String tenant, String namespace, String functionName, String input, InputStream uploadedInputStream, String topic) {
        String inputTopicToWrite;
        if (!this.isWorkerServiceAvailable()) {
            this.throwUnavailableException();
        }
        try {
            this.validateTriggerRequestParams(tenant, namespace, functionName, topic, input, uploadedInputStream);
        }
        catch (IllegalArgumentException e) {
            log.error("Invalid trigger function request @ /{}/{}/{}", new Object[]{tenant, namespace, functionName, e});
            throw new RestException(Response.Status.BAD_REQUEST, e.getMessage());
        }
        FunctionMetaDataManager functionMetaDataManager = this.worker().getFunctionMetaDataManager();
        if (!functionMetaDataManager.containsFunction(tenant, namespace, functionName)) {
            log.error("Function in trigger function does not exist @ /{}/{}/{}", new Object[]{tenant, namespace, functionName});
            throw new RestException(Response.Status.NOT_FOUND, String.format("Function %s doesn't exist", functionName));
        }
        Function.FunctionMetaData functionMetaData = functionMetaDataManager.getFunctionMetaData(tenant, namespace, functionName);
        if (topic != null) {
            inputTopicToWrite = topic;
        } else {
            if (functionMetaData.getFunctionDetails().getSource().getInputSpecsCount() != 1) {
                log.error("Function in trigger function has more than 1 input topics @ /{}/{}/{}", new Object[]{tenant, namespace, functionName});
                throw new RestException(Response.Status.BAD_REQUEST, "Function in trigger function has more than 1 input topics");
            }
            inputTopicToWrite = (String)functionMetaData.getFunctionDetails().getSource().getInputSpecsMap().keySet().iterator().next();
        }
        if (functionMetaData.getFunctionDetails().getSource().getInputSpecsCount() == 0 || !functionMetaData.getFunctionDetails().getSource().getInputSpecsMap().containsKey(inputTopicToWrite)) {
            log.error("Function in trigger function has unidentified topic @ /{}/{}/{} {}", new Object[]{tenant, namespace, functionName, inputTopicToWrite});
            throw new RestException(Response.Status.BAD_REQUEST, "Function in trigger function has unidentified topic");
        }
        try {
            this.worker().getBrokerAdmin().topics().getSubscriptions(inputTopicToWrite);
        }
        catch (PulsarAdminException e) {
            log.error("Function in trigger function is not ready @ /{}/{}/{}", new Object[]{tenant, namespace, functionName});
            throw new RestException(Response.Status.BAD_REQUEST, "Function in trigger function is not ready");
        }
        String outputTopic = functionMetaData.getFunctionDetails().getSink().getTopic();
        Reader reader = null;
        Producer producer = null;
        try {
            byte[] targetArray;
            if (outputTopic != null && !outputTopic.isEmpty()) {
                reader = this.worker().getClient().newReader().topic(outputTopic).startMessageId(MessageId.latest).create();
            }
            producer = this.worker().getClient().newProducer(Schema.AUTO_PRODUCE_BYTES()).topic(inputTopicToWrite).create();
            if (uploadedInputStream != null) {
                targetArray = new byte[uploadedInputStream.available()];
                uploadedInputStream.read(targetArray);
            } else {
                targetArray = input.getBytes();
            }
            MessageId msgId = producer.send((Object)targetArray);
            if (reader == null) {
                String string = null;
                return string;
            }
            long curTime = System.currentTimeMillis();
            long maxTime = curTime + 1000L;
            while (curTime < maxTime) {
                MessageId newMsgId;
                Message msg = reader.readNext(10000, TimeUnit.MILLISECONDS);
                if (msg == null) {
                    throw new RestException(Response.Status.REQUEST_TIMEOUT, "Request Timed Out");
                }
                if (msg.getProperties().containsKey("__pfn_input_msg_id__") && msg.getProperties().containsKey("__pfn_input_topic__") && msgId.equals(newMsgId = MessageId.fromByteArray((byte[])Base64.getDecoder().decode((String)msg.getProperties().get("__pfn_input_msg_id__")))) && msg.getProperties().get("__pfn_input_topic__").equals(TopicName.get((String)inputTopicToWrite).toString())) {
                    String string = new String(msg.getData());
                    return string;
                }
                curTime = System.currentTimeMillis();
            }
            throw new RestException(Response.Status.REQUEST_TIMEOUT, "Request Timed Out");
        }
        catch (SchemaSerializationException e) {
            throw new RestException(Response.Status.BAD_REQUEST, String.format("Failed to serialize input with error: %s. Please check if input data conforms with the schema of the input topic.", e.getMessage()));
        }
        catch (IOException e) {
            throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, e.getMessage());
        }
        finally {
            if (reader != null) {
                reader.closeAsync();
            }
            if (producer != null) {
                producer.closeAsync();
            }
        }
    }

    public FunctionState getFunctionState(String tenant, String namespace, String functionName, String key) {
        FunctionState value;
        if (!this.isWorkerServiceAvailable()) {
            this.throwUnavailableException();
        }
        if (null == this.worker().getStateStoreAdminClient()) {
            this.throwStateStoreUnvailableResponse();
        }
        try {
            this.validateGetFunctionStateParams(tenant, namespace, functionName, key);
        }
        catch (IllegalArgumentException e) {
            log.error("Invalid getFunctionState request @ /{}/{}/{}/{}", new Object[]{tenant, namespace, functionName, key, e});
            throw new RestException(Response.Status.BAD_REQUEST, e.getMessage());
        }
        String tableNs = StateUtils.getStateNamespace((String)tenant, (String)namespace);
        String tableName = functionName;
        String stateStorageServiceUrl = this.worker().getWorkerConfig().getStateStorageServiceUrl();
        if (this.storageClient.get() == null) {
            this.storageClient.compareAndSet(null, StorageClientBuilder.newBuilder().withSettings(StorageClientSettings.newBuilder().serviceUri(stateStorageServiceUrl).clientName("functions-admin").build()).withNamespace(tableNs).build());
        }
        try (Table table = (Table)FutureUtils.result((CompletableFuture)this.storageClient.get().openTable(tableName));
             KeyValue kv = (KeyValue)FutureUtils.result((CompletableFuture)table.getKv((Object)Unpooled.wrappedBuffer((byte[])key.getBytes(StandardCharsets.UTF_8))));){
            if (null == kv) {
                throw new RestException(Response.Status.NOT_FOUND, "key '" + key + "' doesn't exist.");
            }
            value = kv.isNumber() ? new FunctionState(key, null, Long.valueOf(kv.numberValue()), Long.valueOf(kv.version())) : new FunctionState(key, new String(ByteBufUtil.getBytes((ByteBuf)((ByteBuf)kv.value())), StandardCharsets.UTF_8), null, Long.valueOf(kv.version()));
        }
        catch (Exception e) {
            log.error("Error while getFunctionState request @ /{}/{}/{}/{}", new Object[]{tenant, namespace, functionName, key, e});
            throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, e.getMessage());
        }
        return value;
    }

    public void uploadFunction(InputStream uploadedInputStream, String path) {
        try {
            if (uploadedInputStream == null || path == null) {
                throw new IllegalArgumentException("Function Package is not provided " + path);
            }
        }
        catch (IllegalArgumentException e) {
            log.error("Invalid upload function request @ /{}", (Object)path, (Object)e);
            throw new RestException(Response.Status.BAD_REQUEST, e.getMessage());
        }
        try {
            log.info("Uploading function package to {}", (Object)path);
            Utils.uploadToBookeeper(this.worker().getDlogNamespace(), uploadedInputStream, path);
        }
        catch (IOException e) {
            log.error("Error uploading file {}", (Object)path, (Object)e);
            throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, e.getMessage());
        }
    }

    public StreamingOutput downloadFunction(final String path) {
        StreamingOutput streamingOutput = new StreamingOutput(){

            public void write(OutputStream output) throws IOException {
                if (path.startsWith(org.apache.pulsar.common.functions.Utils.HTTP)) {
                    URL url = new URL(path);
                    IOUtils.copy((InputStream)url.openStream(), (OutputStream)output);
                } else if (path.startsWith(org.apache.pulsar.common.functions.Utils.FILE)) {
                    URL url = new URL(path);
                    try {
                        File file = new File(url.toURI());
                        IOUtils.copy((InputStream)new FileInputStream(file), (OutputStream)output);
                    }
                    catch (URISyntaxException e) {
                        throw new IllegalArgumentException("invalid file url path: " + path);
                    }
                } else {
                    Utils.downloadFromBookkeeper(ComponentImpl.this.worker().getDlogNamespace(), output, path);
                }
            }
        };
        return streamingOutput;
    }

    private void validateListFunctionRequestParams(String tenant, String namespace) throws IllegalArgumentException {
        if (tenant == null) {
            throw new IllegalArgumentException("Tenant is not provided");
        }
        if (namespace == null) {
            throw new IllegalArgumentException("Namespace is not provided");
        }
    }

    protected void validateGetFunctionInstanceRequestParams(String tenant, String namespace, String componentName, Utils.ComponentType componentType, String instanceId) throws IllegalArgumentException {
        this.validateGetFunctionRequestParams(tenant, namespace, componentName, componentType);
        if (instanceId == null) {
            throw new IllegalArgumentException(String.format("%s Instance Id is not provided", componentType));
        }
    }

    protected void validateGetFunctionRequestParams(String tenant, String namespace, String subject, Utils.ComponentType componentType) throws IllegalArgumentException {
        if (tenant == null) {
            throw new IllegalArgumentException("Tenant is not provided");
        }
        if (namespace == null) {
            throw new IllegalArgumentException("Namespace is not provided");
        }
        if (subject == null) {
            throw new IllegalArgumentException(componentType + " Name is not provided");
        }
    }

    private void validateDeregisterRequestParams(String tenant, String namespace, String subject, Utils.ComponentType componentType) throws IllegalArgumentException {
        if (tenant == null) {
            throw new IllegalArgumentException("Tenant is not provided");
        }
        if (namespace == null) {
            throw new IllegalArgumentException("Namespace is not provided");
        }
        if (subject == null) {
            throw new IllegalArgumentException(componentType + " Name is not provided");
        }
    }

    private Function.FunctionDetails validateUpdateRequestParamsWithPkgUrl(String tenant, String namespace, String componentName, String functionPkgUrl, String functionDetailsJson, String componentConfigJson, Utils.ComponentType componentType) throws IllegalArgumentException, IOException {
        if (!org.apache.pulsar.common.functions.Utils.isFunctionPackageUrlSupported((String)functionPkgUrl)) {
            throw new IllegalArgumentException("Function Package url is not valid. supported url (http/https/file)");
        }
        Function.FunctionDetails functionDetails = this.validateUpdateRequestParams(tenant, namespace, componentName, functionDetailsJson, componentConfigJson, componentType, functionPkgUrl, null);
        return functionDetails;
    }

    private Function.FunctionDetails validateUpdateRequestParams(String tenant, String namespace, String componentName, File uploadedInputStreamAsFile, FormDataContentDisposition fileDetail, String functionDetailsJson, String componentConfigJson, Utils.ComponentType componentType) throws IllegalArgumentException, IOException {
        Function.FunctionDetails functionDetails = this.validateUpdateRequestParams(tenant, namespace, componentName, functionDetailsJson, componentConfigJson, componentType, null, uploadedInputStreamAsFile);
        if (!(this.isFunctionCodeBuiltin(functionDetails) || uploadedInputStreamAsFile != null && fileDetail != null)) {
            throw new IllegalArgumentException(componentType + " Package is not provided");
        }
        return functionDetails;
    }

    private Function.FunctionDetails validateUpdateRequestParamsWithExistingMetadata(String tenant, String namespace, String componentName, Function.PackageLocationMetaData packageLocationMetaData, String componentConfigJson, Utils.ComponentType componentType) throws Exception {
        File tmpFile = File.createTempFile("functions", null);
        tmpFile.deleteOnExit();
        Utils.downloadFromBookkeeper(this.worker().getDlogNamespace(), tmpFile, packageLocationMetaData.getPackagePath());
        return this.validateUpdateRequestParams(tenant, namespace, componentName, null, componentConfigJson, componentType, null, tmpFile);
    }

    private static File dumpToTmpFile(InputStream uploadedInputStream) {
        try {
            File tmpFile = File.createTempFile("functions", null);
            tmpFile.deleteOnExit();
            Files.copy(uploadedInputStream, tmpFile.toPath(), StandardCopyOption.REPLACE_EXISTING);
            return tmpFile;
        }
        catch (IOException e) {
            throw new RuntimeException("Cannot create a temporary file", e);
        }
    }

    private void validateGetFunctionStateParams(String tenant, String namespace, String functionName, String key) throws IllegalArgumentException {
        if (tenant == null) {
            throw new IllegalArgumentException("Tenant is not provided");
        }
        if (namespace == null) {
            throw new IllegalArgumentException("Namespace is not provided");
        }
        if (functionName == null) {
            throw new IllegalArgumentException("Function Name is not provided");
        }
        if (key == null) {
            throw new IllegalArgumentException("Key is not provided");
        }
    }

    private boolean isFunctionCodeBuiltin(Function.FunctionDetails functionDetails) {
        Function.SinkSpec sinkSpec;
        Function.SourceSpec sourceSpec;
        if (functionDetails.hasSource() && !StringUtils.isEmpty((CharSequence)(sourceSpec = functionDetails.getSource()).getBuiltin())) {
            return true;
        }
        return functionDetails.hasSink() && !StringUtils.isEmpty((CharSequence)(sinkSpec = functionDetails.getSink()).getBuiltin());
    }

    private String getFunctionCodeBuiltin(Function.FunctionDetails functionDetails) {
        Function.SinkSpec sinkSpec;
        Function.SourceSpec sourceSpec;
        if (functionDetails.hasSource() && !StringUtils.isEmpty((CharSequence)(sourceSpec = functionDetails.getSource()).getBuiltin())) {
            return sourceSpec.getBuiltin();
        }
        if (functionDetails.hasSink() && !StringUtils.isEmpty((CharSequence)(sinkSpec = functionDetails.getSink()).getBuiltin())) {
            return sinkSpec.getBuiltin();
        }
        return null;
    }

    private Function.FunctionDetails validateUpdateRequestParams(String tenant, String namespace, String componentName, String functionDetailsJson, String componentConfigJson, Utils.ComponentType componentType, String functionPkgUrl, File uploadedInputStreamAsFile) throws IOException {
        if (tenant == null) {
            throw new IllegalArgumentException("Tenant is not provided");
        }
        if (namespace == null) {
            throw new IllegalArgumentException("Namespace is not provided");
        }
        if (componentName == null) {
            throw new IllegalArgumentException(String.format("%s Name is not provided", componentType));
        }
        if (componentType.equals((Object)Utils.ComponentType.FUNCTION) && !StringUtils.isEmpty((CharSequence)componentConfigJson)) {
            FunctionConfig functionConfig = (FunctionConfig)new Gson().fromJson(componentConfigJson, FunctionConfig.class);
            functionConfig.setTenant(tenant);
            functionConfig.setNamespace(namespace);
            functionConfig.setName(componentName);
            FunctionConfigUtils.inferMissingArguments((FunctionConfig)functionConfig);
            ClassLoader clsLoader = FunctionConfigUtils.validate((FunctionConfig)functionConfig, (String)functionPkgUrl, (File)uploadedInputStreamAsFile);
            return FunctionConfigUtils.convert((FunctionConfig)functionConfig, (ClassLoader)clsLoader);
        }
        if (componentType.equals((Object)Utils.ComponentType.SOURCE)) {
            Path archivePath = null;
            SourceConfig sourceConfig = (SourceConfig)new Gson().fromJson(componentConfigJson, SourceConfig.class);
            sourceConfig.setTenant(tenant);
            sourceConfig.setNamespace(namespace);
            sourceConfig.setName(componentName);
            org.apache.pulsar.common.functions.Utils.inferMissingArguments((SourceConfig)sourceConfig);
            if (!StringUtils.isEmpty((CharSequence)sourceConfig.getArchive())) {
                String builtinArchive = sourceConfig.getArchive();
                if (builtinArchive.startsWith(org.apache.pulsar.common.functions.Utils.BUILTIN)) {
                    builtinArchive = builtinArchive.replaceFirst("^builtin://", "");
                }
                try {
                    archivePath = this.worker().getConnectorsManager().getSourceArchive(builtinArchive);
                }
                catch (Exception e) {
                    throw new IllegalArgumentException(String.format("No Source archive %s found", archivePath));
                }
            }
            SourceConfigUtils.ExtractedSourceDetails sourceDetails = SourceConfigUtils.validate((SourceConfig)sourceConfig, archivePath, (String)functionPkgUrl, (File)uploadedInputStreamAsFile);
            return SourceConfigUtils.convert((SourceConfig)sourceConfig, (SourceConfigUtils.ExtractedSourceDetails)sourceDetails);
        }
        if (componentType.equals((Object)Utils.ComponentType.SINK)) {
            Path archivePath = null;
            SinkConfig sinkConfig = (SinkConfig)new Gson().fromJson(componentConfigJson, SinkConfig.class);
            sinkConfig.setTenant(tenant);
            sinkConfig.setNamespace(namespace);
            sinkConfig.setName(componentName);
            org.apache.pulsar.common.functions.Utils.inferMissingArguments((SinkConfig)sinkConfig);
            if (!StringUtils.isEmpty((CharSequence)sinkConfig.getArchive())) {
                String builtinArchive = sinkConfig.getArchive();
                if (builtinArchive.startsWith(org.apache.pulsar.common.functions.Utils.BUILTIN)) {
                    builtinArchive = builtinArchive.replaceFirst("^builtin://", "");
                }
                try {
                    archivePath = this.worker().getConnectorsManager().getSinkArchive(builtinArchive);
                }
                catch (Exception e) {
                    throw new IllegalArgumentException(String.format("No Sink archive %s found", archivePath));
                }
            }
            SinkConfigUtils.ExtractedSinkDetails sinkDetails = SinkConfigUtils.validate((SinkConfig)sinkConfig, archivePath, (String)functionPkgUrl, (File)uploadedInputStreamAsFile);
            return SinkConfigUtils.convert((SinkConfig)sinkConfig, (SinkConfigUtils.ExtractedSinkDetails)sinkDetails);
        }
        Function.FunctionDetails.Builder functionDetailsBuilder = Function.FunctionDetails.newBuilder();
        org.apache.pulsar.functions.utils.Utils.mergeJson((String)functionDetailsJson, (AbstractMessage.Builder)functionDetailsBuilder);
        if (StringUtils.isNotBlank((CharSequence)functionPkgUrl)) {
            functionDetailsBuilder.setPackageUrl(functionPkgUrl);
        }
        ClassLoader clsLoader = null;
        if (functionDetailsBuilder.getRuntime() == Function.FunctionDetails.Runtime.JAVA) {
            if (!StringUtils.isEmpty((CharSequence)functionPkgUrl)) {
                try {
                    clsLoader = org.apache.pulsar.functions.utils.Utils.extractClassLoader((String)functionPkgUrl);
                }
                catch (Exception e) {
                    throw new IllegalArgumentException("Corrupted Jar file", e);
                }
            }
            try {
                clsLoader = org.apache.pulsar.functions.utils.Utils.loadJar((File)uploadedInputStreamAsFile);
            }
            catch (Exception e) {
                throw new IllegalArgumentException("Corrupted Jar file", e);
            }
        }
        this.validateFunctionClassTypes(clsLoader, functionDetailsBuilder);
        Function.FunctionDetails functionDetails = functionDetailsBuilder.build();
        LinkedList<String> missingFields = new LinkedList<String>();
        if (functionDetails.getTenant() == null || functionDetails.getTenant().isEmpty()) {
            missingFields.add("Tenant");
        }
        if (functionDetails.getNamespace() == null || functionDetails.getNamespace().isEmpty()) {
            missingFields.add("Namespace");
        }
        if (functionDetails.getName() == null || functionDetails.getName().isEmpty()) {
            missingFields.add("Name");
        }
        if (functionDetails.getClassName() == null || functionDetails.getClassName().isEmpty()) {
            missingFields.add("ClassName");
        }
        if (!functionDetails.getSource().isInitialized()) {
            missingFields.add("Source");
        }
        if (!functionDetails.getSink().isInitialized()) {
            missingFields.add("Sink");
        }
        if (!missingFields.isEmpty()) {
            String errorMessage = StringUtils.join(missingFields, (String)",");
            throw new IllegalArgumentException(errorMessage + " is not provided");
        }
        if (functionDetails.getParallelism() <= 0) {
            throw new IllegalArgumentException("Parallelism needs to be set to a positive number");
        }
        return functionDetails;
    }

    /*
     * Unable to fully structure code
     */
    private void validateFunctionClassTypes(ClassLoader classLoader, Function.FunctionDetails.Builder functionDetailsBuilder) {
        if (classLoader == null) {
            return;
        }
        if (StringUtils.isBlank((CharSequence)functionDetailsBuilder.getClassName())) {
            throw new IllegalArgumentException("function class-name can't be empty");
        }
        functionObject = Reflections.createInstance((String)functionDetailsBuilder.getClassName(), (ClassLoader)classLoader);
        typeArgs = org.apache.pulsar.functions.utils.Utils.getFunctionTypes((Object)functionObject, (boolean)false);
        if (!(functionObject instanceof Function) && !(functionObject instanceof java.util.function.Function)) {
            throw new RuntimeException("User class must either be Function or java.util.Function");
        }
        if (functionDetailsBuilder.hasSource() && functionDetailsBuilder.getSource() != null && StringUtils.isNotBlank((CharSequence)functionDetailsBuilder.getSource().getClassName())) {
            try {
                sourceClassName = functionDetailsBuilder.getSource().getClassName();
                argClassName = this.getTypeArg(sourceClassName, Source.class, classLoader).getName();
                functionDetailsBuilder.setSource(functionDetailsBuilder.getSourceBuilder().setTypeClassName(argClassName));
                if (functionDetailsBuilder.hasSink() && !StringUtils.isBlank((CharSequence)functionDetailsBuilder.getSink().getClassName())) ** GOTO lbl27
                functionDetailsBuilder.setSink(functionDetailsBuilder.getSinkBuilder().setTypeClassName(argClassName));
            }
            catch (IllegalArgumentException ie) {
                throw ie;
            }
            catch (Exception e) {
                ComponentImpl.log.error("Failed to validate source class", (Throwable)e);
                throw new IllegalArgumentException("Failed to validate source class-name", e);
            }
        } else if (StringUtils.isBlank((CharSequence)functionDetailsBuilder.getSourceBuilder().getTypeClassName())) {
            functionDetailsBuilder.setSource(functionDetailsBuilder.getSourceBuilder().setTypeClassName(typeArgs[0].getName()));
        }
lbl27:
        // 5 sources

        if (functionDetailsBuilder.hasSink() && functionDetailsBuilder.getSink() != null && StringUtils.isNotBlank((CharSequence)functionDetailsBuilder.getSink().getClassName())) {
            try {
                sinkClassName = functionDetailsBuilder.getSink().getClassName();
                argClassName = this.getTypeArg(sinkClassName, Sink.class, classLoader).getName();
                functionDetailsBuilder.setSink(functionDetailsBuilder.getSinkBuilder().setTypeClassName(argClassName));
                if (functionDetailsBuilder.hasSource() && !StringUtils.isBlank((CharSequence)functionDetailsBuilder.getSource().getClassName())) ** GOTO lbl45
                functionDetailsBuilder.setSource(functionDetailsBuilder.getSourceBuilder().setTypeClassName(argClassName));
            }
            catch (IllegalArgumentException ie) {
                throw ie;
            }
            catch (Exception e) {
                ComponentImpl.log.error("Failed to validate sink class", (Throwable)e);
                throw new IllegalArgumentException("Failed to validate sink class-name", e);
            }
        } else if (StringUtils.isBlank((CharSequence)functionDetailsBuilder.getSinkBuilder().getTypeClassName())) {
            functionDetailsBuilder.setSink(functionDetailsBuilder.getSinkBuilder().setTypeClassName(typeArgs[1].getName()));
        }
lbl45:
        // 5 sources

    }

    private Class<?> getTypeArg(String className, Class<?> funClass, ClassLoader classLoader) throws ClassNotFoundException {
        Class<?> loadedClass = classLoader.loadClass(className);
        if (!funClass.isAssignableFrom(loadedClass)) {
            throw new IllegalArgumentException(String.format("class %s is not type of %s", className, funClass.getName()));
        }
        return TypeResolver.resolveRawArgument(funClass, loadedClass);
    }

    private void validateTriggerRequestParams(String tenant, String namespace, String functionName, String topic, String input, InputStream uploadedInputStream) {
        if (tenant == null) {
            throw new IllegalArgumentException("Tenant is not provided");
        }
        if (namespace == null) {
            throw new IllegalArgumentException("Namespace is not provided");
        }
        if (functionName == null) {
            throw new IllegalArgumentException("Function Name is not provided");
        }
        if (uploadedInputStream == null && input == null) {
            throw new IllegalArgumentException("Trigger Data is not provided");
        }
    }

    protected void throwUnavailableException() {
        throw new RestException(Response.Status.SERVICE_UNAVAILABLE, "Function worker service is not done initializing. Please try again in a little while.");
    }

    private void throwStateStoreUnvailableResponse() {
        throw new RestException(Response.Status.SERVICE_UNAVAILABLE, "State storage client is not done initializing. Please try again in a little while.");
    }

    public static String createPackagePath(String tenant, String namespace, String functionName, String fileName) {
        return String.format("%s/%s/%s/%s", tenant, namespace, Codec.encode((String)functionName), Utils.getUniquePackageName(Codec.encode((String)fileName)));
    }

    public boolean isAuthorizedRole(String tenant, String clientRole) throws PulsarAdminException {
        if (this.worker().getWorkerConfig().isAuthorizationEnabled()) {
            if (this.isSuperUser(clientRole)) {
                return true;
            }
            TenantInfo tenantInfo = this.worker().getBrokerAdmin().tenants().getTenantInfo(tenant);
            return clientRole != null && (tenantInfo.getAdminRoles() == null || tenantInfo.getAdminRoles().isEmpty() || tenantInfo.getAdminRoles().contains(clientRole));
        }
        return true;
    }

    public boolean isSuperUser(String clientRole) {
        return clientRole != null && this.worker().getWorkerConfig().getSuperUserRoles().contains(clientRole);
    }

    public Utils.ComponentType calculateSubjectType(Function.FunctionMetaData functionMetaData) {
        Function.SourceSpec sourceSpec = functionMetaData.getFunctionDetails().getSource();
        Function.SinkSpec sinkSpec = functionMetaData.getFunctionDetails().getSink();
        if (sourceSpec.getInputSpecsCount() == 0) {
            return Utils.ComponentType.SOURCE;
        }
        if (!StringUtils.isEmpty((CharSequence)sinkSpec.getBuiltin())) {
            return Utils.ComponentType.SINK;
        }
        if (StringUtils.isEmpty((CharSequence)sinkSpec.getClassName()) || sinkSpec.getClassName().equals(PulsarSink.class.getName())) {
            return Utils.ComponentType.FUNCTION;
        }
        return Utils.ComponentType.SINK;
    }

    protected void componentStatusRequestValidate(String tenant, String namespace, String componentName) {
        if (!this.isWorkerServiceAvailable()) {
            throw new RestException(Response.Status.SERVICE_UNAVAILABLE, "Function worker service is not done initializing. Please try again in a little while.");
        }
        try {
            this.validateGetFunctionRequestParams(tenant, namespace, componentName, this.componentType);
        }
        catch (IllegalArgumentException e) {
            log.error("Invalid get {} Status request @ /{}/{}/{}", new Object[]{this.componentType, tenant, namespace, componentName, e});
            throw new RestException(Response.Status.BAD_REQUEST, e.getMessage());
        }
        FunctionMetaDataManager functionMetaDataManager = this.worker().getFunctionMetaDataManager();
        if (!functionMetaDataManager.containsFunction(tenant, namespace, componentName)) {
            log.error("{} in get {} Status does not exist @ /{}/{}/{}", new Object[]{this.componentType, this.componentType, tenant, namespace, componentName});
            throw new RestException(Response.Status.NOT_FOUND, String.format("%s %s doesn't exist", this.componentType, componentName));
        }
        Function.FunctionMetaData functionMetaData = functionMetaDataManager.getFunctionMetaData(tenant, namespace, componentName);
        if (!this.calculateSubjectType(functionMetaData).equals((Object)this.componentType)) {
            log.error("{}/{}/{} is not a {}", new Object[]{tenant, namespace, componentName, this.componentType});
            throw new RestException(Response.Status.NOT_FOUND, String.format("%s %s doesn't exist", this.componentType, componentName));
        }
    }

    protected void componentInstanceStatusRequestValidate(String tenant, String namespace, String componentName, int instanceId) {
        this.componentStatusRequestValidate(tenant, namespace, componentName);
        FunctionMetaDataManager functionMetaDataManager = this.worker().getFunctionMetaDataManager();
        Function.FunctionMetaData functionMetaData = functionMetaDataManager.getFunctionMetaData(tenant, namespace, componentName);
        int parallelism = functionMetaData.getFunctionDetails().getParallelism();
        if (instanceId < 0 || instanceId >= parallelism) {
            log.error("instanceId in get {} Status out of bounds @ /{}/{}/{}", new Object[]{this.componentType, tenant, namespace, componentName});
            throw new RestException(Response.Status.BAD_REQUEST, String.format("%s %s doesn't have instance with id %s", this.componentType, componentName, instanceId));
        }
    }

    protected abstract class GetStatus<S, T> {
        protected GetStatus() {
        }

        public abstract T notScheduledInstance();

        public abstract T fromFunctionStatusProto(InstanceCommunication.FunctionStatus var1, String var2);

        public abstract T notRunning(String var1, String var2);

        public T getComponentInstanceStatus(String tenant, String namespace, String name, int instanceId, URI uri) {
            String workerId;
            Function.Assignment assignment = ComponentImpl.this.worker().getFunctionRuntimeManager().getRuntimeFactory().externallyManaged() ? ComponentImpl.this.worker().getFunctionRuntimeManager().findFunctionAssignment(tenant, namespace, name, -1) : ComponentImpl.this.worker().getFunctionRuntimeManager().findFunctionAssignment(tenant, namespace, name, instanceId);
            if (assignment == null) {
                return this.notScheduledInstance();
            }
            String assignedWorkerId = assignment.getWorkerId();
            if (assignedWorkerId.equals(workerId = ComponentImpl.this.worker().getWorkerConfig().getWorkerId())) {
                FunctionRuntimeInfo functionRuntimeInfo = ComponentImpl.this.worker().getFunctionRuntimeManager().getFunctionRuntimeInfo(org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId((Function.Instance)assignment.getInstance()));
                if (functionRuntimeInfo == null) {
                    return this.notRunning(assignedWorkerId, "");
                }
                RuntimeSpawner runtimeSpawner = functionRuntimeInfo.getRuntimeSpawner();
                if (runtimeSpawner != null) {
                    try {
                        return this.fromFunctionStatusProto((InstanceCommunication.FunctionStatus)functionRuntimeInfo.getRuntimeSpawner().getFunctionStatus(instanceId).get(), assignedWorkerId);
                    }
                    catch (InterruptedException | ExecutionException e) {
                        throw new RuntimeException(e);
                    }
                }
                String message = functionRuntimeInfo.getStartupException() != null ? functionRuntimeInfo.getStartupException().getMessage() : "";
                return this.notRunning(assignedWorkerId, message);
            }
            List<WorkerInfo> workerInfoList = ComponentImpl.this.worker().getMembershipManager().getCurrentMembership();
            WorkerInfo workerInfo = null;
            for (WorkerInfo entry : workerInfoList) {
                if (!assignment.getWorkerId().equals(entry.getWorkerId())) continue;
                workerInfo = entry;
            }
            if (workerInfo == null) {
                return this.notScheduledInstance();
            }
            if (uri == null) {
                throw new WebApplicationException(Response.serverError().status(Response.Status.INTERNAL_SERVER_ERROR).build());
            }
            URI redirect = UriBuilder.fromUri((URI)uri).host(workerInfo.getWorkerHostname()).port(workerInfo.getPort()).build(new Object[0]);
            throw new WebApplicationException(Response.temporaryRedirect((URI)redirect).build());
        }

        public abstract S getStatus(String var1, String var2, String var3, Collection<Function.Assignment> var4, URI var5) throws PulsarAdminException;

        public abstract S getStatusExternal(String var1, String var2, String var3, int var4);

        public abstract S emptyStatus(int var1);

        public S getComponentStatus(String tenant, String namespace, String name, URI uri) {
            Function.FunctionMetaData functionMetaData = ComponentImpl.this.worker().getFunctionMetaDataManager().getFunctionMetaData(tenant, namespace, name);
            Collection<Function.Assignment> assignments = ComponentImpl.this.worker().getFunctionRuntimeManager().findFunctionAssignments(tenant, namespace, name);
            if (ComponentImpl.this.worker().getFunctionRuntimeManager().getRuntimeFactory().externallyManaged()) {
                Function.Assignment assignment = assignments.iterator().next();
                boolean isOwner = ComponentImpl.this.worker().getWorkerConfig().getWorkerId().equals(assignment.getWorkerId());
                if (isOwner) {
                    return this.getStatusExternal(tenant, namespace, name, functionMetaData.getFunctionDetails().getParallelism());
                }
                List<WorkerInfo> workerInfoList = ComponentImpl.this.worker().getMembershipManager().getCurrentMembership();
                WorkerInfo workerInfo = null;
                for (WorkerInfo entry : workerInfoList) {
                    if (!assignment.getWorkerId().equals(entry.getWorkerId())) continue;
                    workerInfo = entry;
                }
                if (workerInfo == null) {
                    return this.emptyStatus(functionMetaData.getFunctionDetails().getParallelism());
                }
                if (uri == null) {
                    throw new WebApplicationException(Response.serverError().status(Response.Status.INTERNAL_SERVER_ERROR).build());
                }
                URI redirect = UriBuilder.fromUri((URI)uri).host(workerInfo.getWorkerHostname()).port(workerInfo.getPort()).build(new Object[0]);
                throw new WebApplicationException(Response.temporaryRedirect((URI)redirect).build());
            }
            try {
                return this.getStatus(tenant, namespace, name, assignments, uri);
            }
            catch (PulsarAdminException e) {
                throw new RuntimeException(e);
            }
        }
    }
}

