/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.functions.worker.rest.api;

import com.google.common.base.Preconditions;
import com.google.gson.Gson;
import com.google.protobuf.AbstractMessage;
import com.google.protobuf.MessageOrBuilder;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.MalformedURLException;
import java.net.URISyntaxException;
import java.net.URL;
import java.net.URLClassLoader;
import java.util.Base64;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.Supplier;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.StreamingOutput;
import net.jodah.typetools.TypeResolver;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.common.io.ConnectorDefinition;
import org.apache.pulsar.common.policies.data.ErrorData;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.util.Codec;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.InstanceCommunication;
import org.apache.pulsar.functions.utils.Reflections;
import org.apache.pulsar.functions.utils.functioncache.FunctionClassLoaders;
import org.apache.pulsar.functions.worker.FunctionMetaDataManager;
import org.apache.pulsar.functions.worker.FunctionRuntimeManager;
import org.apache.pulsar.functions.worker.MembershipManager;
import org.apache.pulsar.functions.worker.Utils;
import org.apache.pulsar.functions.worker.WorkerInfo;
import org.apache.pulsar.functions.worker.WorkerService;
import org.apache.pulsar.functions.worker.request.RequestResult;
import org.apache.pulsar.io.core.Sink;
import org.apache.pulsar.io.core.Source;
import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FunctionsImpl {
    private static final Logger log = LoggerFactory.getLogger(FunctionsImpl.class);
    private final Supplier<WorkerService> workerServiceSupplier;

    public FunctionsImpl(Supplier<WorkerService> workerServiceSupplier) {
        this.workerServiceSupplier = workerServiceSupplier;
    }

    private WorkerService worker() {
        try {
            return (WorkerService)Preconditions.checkNotNull((Object)this.workerServiceSupplier.get());
        }
        catch (Throwable t) {
            log.info("Failed to get worker service", t);
            throw t;
        }
    }

    private boolean isWorkerServiceAvailable() {
        WorkerService workerService = this.workerServiceSupplier.get();
        if (workerService == null) {
            return false;
        }
        return workerService.isInitialized();
    }

    public Response registerFunction(String tenant, String namespace, String functionName, InputStream uploadedInputStream, FormDataContentDisposition fileDetail, String functionPkgUrl, String functionDetailsJson, String clientRole) {
        Function.FunctionDetails functionDetails;
        if (!this.isWorkerServiceAvailable()) {
            return this.getUnavailableResponse();
        }
        try {
            if (!this.isAuthorizedRole(tenant, clientRole)) {
                log.error("{}/{}/{} Client [{}] is not admin and authorized to register function", new Object[]{tenant, namespace, functionName, clientRole});
                return Response.status((Response.Status)Response.Status.UNAUTHORIZED).type("application/json").entity((Object)new ErrorData("client is not authorize to perform operation")).build();
            }
        }
        catch (PulsarAdminException e) {
            log.error("{}/{}/{} Failed to authorize [{}]", new Object[]{tenant, namespace, functionName, e});
            return Response.status((Response.Status)Response.Status.INTERNAL_SERVER_ERROR).type("application/json").entity((Object)new ErrorData(e.getMessage())).build();
        }
        boolean isPkgUrlProvided = StringUtils.isNotBlank((CharSequence)functionPkgUrl);
        try {
            functionDetails = isPkgUrlProvided ? this.validateUpdateRequestParamsWithPkgUrl(tenant, namespace, functionName, functionPkgUrl, functionDetailsJson) : this.validateUpdateRequestParams(tenant, namespace, functionName, uploadedInputStream, fileDetail, functionDetailsJson);
        }
        catch (Exception e) {
            log.error("Invalid register function request @ /{}/{}/{}", new Object[]{tenant, namespace, functionName, e});
            return Response.status((Response.Status)Response.Status.BAD_REQUEST).type("application/json").entity((Object)new ErrorData(e.getMessage())).build();
        }
        FunctionMetaDataManager functionMetaDataManager = this.worker().getFunctionMetaDataManager();
        if (functionMetaDataManager.containsFunction(tenant, namespace, functionName)) {
            log.error("Function {}/{}/{} already exists", new Object[]{tenant, namespace, functionName});
            return Response.status((Response.Status)Response.Status.BAD_REQUEST).type("application/json").entity((Object)new ErrorData(String.format("Function %s already exists", functionName))).build();
        }
        Function.FunctionMetaData.Builder functionMetaDataBuilder = Function.FunctionMetaData.newBuilder().setFunctionDetails(functionDetails).setCreateTime(System.currentTimeMillis()).setVersion(0L);
        Function.PackageLocationMetaData.Builder packageLocationMetaDataBuilder = Function.PackageLocationMetaData.newBuilder();
        boolean isBuiltin = this.isFunctionCodeBuiltin(functionDetails);
        if (isBuiltin) {
            packageLocationMetaDataBuilder.setPackagePath("builtin://" + this.getFunctionCodeBuiltin(functionDetails));
        } else {
            packageLocationMetaDataBuilder.setPackagePath(isPkgUrlProvided ? functionPkgUrl : FunctionsImpl.createPackagePath(tenant, namespace, functionName, fileDetail.getFileName()));
        }
        functionMetaDataBuilder.setPackageLocation(packageLocationMetaDataBuilder);
        return isPkgUrlProvided || isBuiltin ? this.updateRequest(functionMetaDataBuilder.build()) : this.updateRequest(functionMetaDataBuilder.build(), uploadedInputStream);
    }

    public Response updateFunction(String tenant, String namespace, String functionName, InputStream uploadedInputStream, FormDataContentDisposition fileDetail, String functionPkgUrl, String functionDetailsJson, String clientRole) {
        Function.FunctionDetails functionDetails;
        if (!this.isWorkerServiceAvailable()) {
            return this.getUnavailableResponse();
        }
        try {
            if (!this.isAuthorizedRole(tenant, clientRole)) {
                log.error("{}/{}/{} Client [{}] is not admin and authorized to update function", new Object[]{tenant, namespace, functionName, clientRole});
                return Response.status((Response.Status)Response.Status.UNAUTHORIZED).type("application/json").entity((Object)new ErrorData("client is not authorize to perform operation")).build();
            }
        }
        catch (PulsarAdminException e) {
            log.error("{}/{}/{} Failed to authorize [{}]", new Object[]{tenant, namespace, functionName, e});
            return Response.status((Response.Status)Response.Status.INTERNAL_SERVER_ERROR).type("application/json").entity((Object)new ErrorData(e.getMessage())).build();
        }
        boolean isPkgUrlProvided = StringUtils.isNotBlank((CharSequence)functionPkgUrl);
        try {
            functionDetails = isPkgUrlProvided ? this.validateUpdateRequestParamsWithPkgUrl(tenant, namespace, functionName, functionPkgUrl, functionDetailsJson) : this.validateUpdateRequestParams(tenant, namespace, functionName, uploadedInputStream, fileDetail, functionDetailsJson);
        }
        catch (Exception e) {
            log.error("Invalid register function request @ /{}/{}/{}", new Object[]{tenant, namespace, functionName, e});
            return Response.status((Response.Status)Response.Status.BAD_REQUEST).type("application/json").entity((Object)new ErrorData(e.getMessage())).build();
        }
        FunctionMetaDataManager functionMetaDataManager = this.worker().getFunctionMetaDataManager();
        if (!functionMetaDataManager.containsFunction(tenant, namespace, functionName)) {
            return Response.status((Response.Status)Response.Status.BAD_REQUEST).type("application/json").entity((Object)new ErrorData(String.format("Function %s doesn't exist", functionName))).build();
        }
        Function.FunctionMetaData.Builder functionMetaDataBuilder = Function.FunctionMetaData.newBuilder().setFunctionDetails(functionDetails).setCreateTime(System.currentTimeMillis()).setVersion(0L);
        Function.PackageLocationMetaData.Builder packageLocationMetaDataBuilder = Function.PackageLocationMetaData.newBuilder();
        boolean isBuiltin = this.isFunctionCodeBuiltin(functionDetails);
        if (isBuiltin) {
            packageLocationMetaDataBuilder.setPackagePath("builtin://" + this.getFunctionCodeBuiltin(functionDetails));
        } else {
            packageLocationMetaDataBuilder.setPackagePath(isPkgUrlProvided ? functionPkgUrl : FunctionsImpl.createPackagePath(tenant, namespace, functionName, fileDetail.getFileName()));
        }
        functionMetaDataBuilder.setPackageLocation(packageLocationMetaDataBuilder);
        return isPkgUrlProvided || isBuiltin ? this.updateRequest(functionMetaDataBuilder.build()) : this.updateRequest(functionMetaDataBuilder.build(), uploadedInputStream);
    }

    public Response deregisterFunction(String tenant, String namespace, String functionName, String clientRole) {
        if (!this.isWorkerServiceAvailable()) {
            return this.getUnavailableResponse();
        }
        try {
            if (!this.isAuthorizedRole(tenant, clientRole)) {
                log.error("{}/{}/{} Client [{}] is not admin and authorized to deregister function", new Object[]{tenant, namespace, functionName, clientRole});
                return Response.status((Response.Status)Response.Status.UNAUTHORIZED).type("application/json").entity((Object)new ErrorData("client is not authorize to perform operation")).build();
            }
        }
        catch (PulsarAdminException e) {
            log.error("{}/{}/{} Failed to authorize [{}]", new Object[]{tenant, namespace, functionName, e});
            return Response.status((Response.Status)Response.Status.INTERNAL_SERVER_ERROR).type("application/json").entity((Object)new ErrorData(e.getMessage())).build();
        }
        try {
            this.validateDeregisterRequestParams(tenant, namespace, functionName);
        }
        catch (IllegalArgumentException e) {
            log.error("Invalid deregister function request @ /{}/{}/{}", new Object[]{tenant, namespace, functionName, e});
            return Response.status((Response.Status)Response.Status.BAD_REQUEST).type("application/json").entity((Object)new ErrorData(e.getMessage())).build();
        }
        FunctionMetaDataManager functionMetaDataManager = this.worker().getFunctionMetaDataManager();
        if (!functionMetaDataManager.containsFunction(tenant, namespace, functionName)) {
            log.error("Function to deregister does not exist @ /{}/{}/{}", new Object[]{tenant, namespace, functionName});
            return Response.status((Response.Status)Response.Status.NOT_FOUND).type("application/json").entity((Object)new ErrorData(String.format("Function %s doesn't exist", functionName))).build();
        }
        CompletableFuture<RequestResult> completableFuture = functionMetaDataManager.deregisterFunction(tenant, namespace, functionName);
        RequestResult requestResult = null;
        try {
            requestResult = completableFuture.get();
            if (!requestResult.isSuccess()) {
                return Response.status((Response.Status)Response.Status.BAD_REQUEST).type("application/json").entity((Object)new ErrorData(requestResult.getMessage())).build();
            }
        }
        catch (ExecutionException e) {
            log.error("Execution Exception while deregistering function @ /{}/{}/{}", new Object[]{tenant, namespace, functionName, e});
            return Response.serverError().type("application/json").entity((Object)new ErrorData(e.getCause().getMessage())).build();
        }
        catch (InterruptedException e) {
            log.error("Interrupted Exception while deregistering function @ /{}/{}/{}", new Object[]{tenant, namespace, functionName, e});
            return Response.status((Response.Status)Response.Status.REQUEST_TIMEOUT).type("application/json").build();
        }
        return Response.status((Response.Status)Response.Status.OK).entity((Object)requestResult.toJson()).build();
    }

    public Response getFunctionInfo(String tenant, String namespace, String functionName) throws IOException {
        if (!this.isWorkerServiceAvailable()) {
            return this.getUnavailableResponse();
        }
        try {
            this.validateGetFunctionRequestParams(tenant, namespace, functionName);
        }
        catch (IllegalArgumentException e) {
            log.error("Invalid getFunction request @ /{}/{}/{}", new Object[]{tenant, namespace, functionName, e});
            return Response.status((Response.Status)Response.Status.BAD_REQUEST).type("application/json").entity((Object)new ErrorData(e.getMessage())).build();
        }
        FunctionMetaDataManager functionMetaDataManager = this.worker().getFunctionMetaDataManager();
        if (!functionMetaDataManager.containsFunction(tenant, namespace, functionName)) {
            log.error("Function in getFunction does not exist @ /{}/{}/{}", new Object[]{tenant, namespace, functionName});
            return Response.status((Response.Status)Response.Status.NOT_FOUND).type("application/json").entity((Object)new ErrorData(String.format("Function %s doesn't exist", functionName))).build();
        }
        Function.FunctionMetaData functionMetaData = functionMetaDataManager.getFunctionMetaData(tenant, namespace, functionName);
        String functionDetailsJson = org.apache.pulsar.functions.utils.Utils.printJson((MessageOrBuilder)functionMetaData.getFunctionDetails());
        return Response.status((Response.Status)Response.Status.OK).entity((Object)functionDetailsJson).build();
    }

    public Response getFunctionInstanceStatus(String tenant, String namespace, String functionName, String instanceId) throws IOException {
        if (!this.isWorkerServiceAvailable()) {
            return this.getUnavailableResponse();
        }
        try {
            this.validateGetFunctionInstanceRequestParams(tenant, namespace, functionName, instanceId);
        }
        catch (IllegalArgumentException e) {
            log.error("Invalid getFunctionStatus request @ /{}/{}/{}", new Object[]{tenant, namespace, functionName, e});
            return Response.status((Response.Status)Response.Status.BAD_REQUEST).type("application/json").entity((Object)new ErrorData(e.getMessage())).build();
        }
        FunctionMetaDataManager functionMetaDataManager = this.worker().getFunctionMetaDataManager();
        if (!functionMetaDataManager.containsFunction(tenant, namespace, functionName)) {
            log.error("Function in getFunctionStatus does not exist @ /{}/{}/{}", new Object[]{tenant, namespace, functionName});
            return Response.status((Response.Status)Response.Status.NOT_FOUND).type("application/json").entity((Object)new ErrorData(String.format("Function %s doesn't exist", functionName))).build();
        }
        FunctionRuntimeManager functionRuntimeManager = this.worker().getFunctionRuntimeManager();
        InstanceCommunication.FunctionStatus functionStatus = null;
        try {
            functionStatus = functionRuntimeManager.getFunctionInstanceStatus(tenant, namespace, functionName, Integer.parseInt(instanceId));
        }
        catch (Exception e) {
            log.error("Got Exception Getting Status", (Throwable)e);
            InstanceCommunication.FunctionStatus.Builder functionStatusBuilder = InstanceCommunication.FunctionStatus.newBuilder();
            functionStatusBuilder.setRunning(false);
            String functionDetailsJson = org.apache.pulsar.functions.utils.Utils.printJson((MessageOrBuilder)functionStatusBuilder.build());
            return Response.status((Response.Status)Response.Status.OK).entity((Object)functionDetailsJson).build();
        }
        String jsonResponse = org.apache.pulsar.functions.utils.Utils.printJson((MessageOrBuilder)functionStatus);
        return Response.status((Response.Status)Response.Status.OK).entity((Object)jsonResponse).build();
    }

    public Response getFunctionStatus(String tenant, String namespace, String functionName) throws IOException {
        if (!this.isWorkerServiceAvailable()) {
            return this.getUnavailableResponse();
        }
        try {
            this.validateGetFunctionRequestParams(tenant, namespace, functionName);
        }
        catch (IllegalArgumentException e) {
            log.error("Invalid getFunctionStatus request @ /{}/{}/{}", new Object[]{tenant, namespace, functionName, e});
            return Response.status((Response.Status)Response.Status.BAD_REQUEST).type("application/json").entity((Object)new ErrorData(e.getMessage())).build();
        }
        FunctionMetaDataManager functionMetaDataManager = this.worker().getFunctionMetaDataManager();
        if (!functionMetaDataManager.containsFunction(tenant, namespace, functionName)) {
            log.error("Function in getFunctionStatus does not exist @ /{}/{}/{}", new Object[]{tenant, namespace, functionName});
            return Response.status((Response.Status)Response.Status.NOT_FOUND).type("application/json").entity((Object)new ErrorData(String.format("Function %s doesn't exist", functionName))).build();
        }
        FunctionRuntimeManager functionRuntimeManager = this.worker().getFunctionRuntimeManager();
        InstanceCommunication.FunctionStatusList functionStatusList = null;
        try {
            functionStatusList = functionRuntimeManager.getAllFunctionStatus(tenant, namespace, functionName);
        }
        catch (Exception e) {
            log.error("Got Exception Getting Status", (Throwable)e);
            InstanceCommunication.FunctionStatus.Builder functionStatusBuilder = InstanceCommunication.FunctionStatus.newBuilder();
            functionStatusBuilder.setRunning(false);
            String functionDetailsJson = org.apache.pulsar.functions.utils.Utils.printJson((MessageOrBuilder)functionStatusBuilder.build());
            return Response.status((Response.Status)Response.Status.OK).entity((Object)functionDetailsJson).build();
        }
        String jsonResponse = org.apache.pulsar.functions.utils.Utils.printJson((MessageOrBuilder)functionStatusList);
        return Response.status((Response.Status)Response.Status.OK).entity((Object)jsonResponse).build();
    }

    public Response listFunctions(String tenant, String namespace) {
        if (!this.isWorkerServiceAvailable()) {
            return this.getUnavailableResponse();
        }
        try {
            this.validateListFunctionRequestParams(tenant, namespace);
        }
        catch (IllegalArgumentException e) {
            log.error("Invalid listFunctions request @ /{}/{}", new Object[]{tenant, namespace, e});
            return Response.status((Response.Status)Response.Status.BAD_REQUEST).type("application/json").entity((Object)new ErrorData(e.getMessage())).build();
        }
        FunctionMetaDataManager functionMetaDataManager = this.worker().getFunctionMetaDataManager();
        Collection<String> functionStateList = functionMetaDataManager.listFunctions(tenant, namespace);
        return Response.status((Response.Status)Response.Status.OK).entity((Object)new Gson().toJson((Object)functionStateList.toArray())).build();
    }

    private Response updateRequest(Function.FunctionMetaData functionMetaData, InputStream uploadedInputStream) {
        try {
            log.info("Uploading function package to {}", (Object)functionMetaData.getPackageLocation());
            Utils.uploadToBookeeper(this.worker().getDlogNamespace(), uploadedInputStream, functionMetaData.getPackageLocation().getPackagePath());
        }
        catch (IOException e) {
            log.error("Error uploading file {}", (Object)functionMetaData.getPackageLocation(), (Object)e);
            return Response.serverError().type("application/json").entity((Object)new ErrorData(e.getMessage())).build();
        }
        return this.updateRequest(functionMetaData);
    }

    private Response updateRequest(Function.FunctionMetaData functionMetaData) {
        FunctionMetaDataManager functionMetaDataManager = this.worker().getFunctionMetaDataManager();
        CompletableFuture<RequestResult> completableFuture = functionMetaDataManager.updateFunction(functionMetaData);
        RequestResult requestResult = null;
        try {
            requestResult = completableFuture.get();
            if (!requestResult.isSuccess()) {
                return Response.status((Response.Status)Response.Status.BAD_REQUEST).type("application/json").entity((Object)new ErrorData(requestResult.getMessage())).build();
            }
        }
        catch (ExecutionException e) {
            return Response.serverError().type("application/json").entity((Object)new ErrorData(e.getCause().getMessage())).build();
        }
        catch (InterruptedException e) {
            return Response.status((Response.Status)Response.Status.REQUEST_TIMEOUT).type("application/json").entity((Object)new ErrorData(e.getCause().getMessage())).build();
        }
        return Response.status((Response.Status)Response.Status.OK).build();
    }

    public List<ConnectorDefinition> getListOfConnectors() {
        if (!this.isWorkerServiceAvailable()) {
            throw new WebApplicationException(Response.status((Response.Status)Response.Status.SERVICE_UNAVAILABLE).type("application/json").entity((Object)new ErrorData("Function worker service is not avaialable")).build());
        }
        return this.worker().getConnectorsManager().getConnectors();
    }

    public List<WorkerInfo> getWorkers() {
        if (!this.isWorkerServiceAvailable()) {
            throw new WebApplicationException(Response.status((Response.Status)Response.Status.SERVICE_UNAVAILABLE).type("application/json").entity((Object)new ErrorData("Function worker service is not avaialable")).build());
        }
        return this.worker().getMembershipManager().getCurrentMembership();
    }

    public Response getCluster() {
        if (!this.isWorkerServiceAvailable()) {
            return this.getUnavailableResponse();
        }
        MembershipManager membershipManager = this.worker().getMembershipManager();
        List<WorkerInfo> members = membershipManager.getCurrentMembership();
        return Response.status((Response.Status)Response.Status.OK).entity((Object)new Gson().toJson(members)).build();
    }

    public Response getAssignments() {
        if (!this.isWorkerServiceAvailable()) {
            return this.getUnavailableResponse();
        }
        FunctionRuntimeManager functionRuntimeManager = this.worker().getFunctionRuntimeManager();
        Map<String, Map<String, Function.Assignment>> assignments = functionRuntimeManager.getCurrentAssignments();
        HashMap<String, Set<String>> ret = new HashMap<String, Set<String>>();
        for (Map.Entry<String, Map<String, Function.Assignment>> entry : assignments.entrySet()) {
            ret.put(entry.getKey(), entry.getValue().keySet());
        }
        return Response.status((Response.Status)Response.Status.OK).entity((Object)new Gson().toJson(ret)).build();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public Response triggerFunction(String tenant, String namespace, String functionName, String input, InputStream uploadedInputStream, String topic) {
        String inputTopicToWrite;
        if (!this.isWorkerServiceAvailable()) {
            return this.getUnavailableResponse();
        }
        try {
            this.validateTriggerRequestParams(tenant, namespace, functionName, topic, input, uploadedInputStream);
        }
        catch (IllegalArgumentException e) {
            log.error("Invalid trigger function request @ /{}/{}/{}", new Object[]{tenant, namespace, functionName, e});
            return Response.status((Response.Status)Response.Status.BAD_REQUEST).type("application/json").entity((Object)new ErrorData(e.getMessage())).build();
        }
        FunctionMetaDataManager functionMetaDataManager = this.worker().getFunctionMetaDataManager();
        if (!functionMetaDataManager.containsFunction(tenant, namespace, functionName)) {
            log.error("Function in getFunction does not exist @ /{}/{}/{}", new Object[]{tenant, namespace, functionName});
            return Response.status((Response.Status)Response.Status.NOT_FOUND).type("application/json").entity((Object)new ErrorData(String.format("Function %s doesn't exist", functionName))).build();
        }
        Function.FunctionMetaData functionMetaData = functionMetaDataManager.getFunctionMetaData(tenant, namespace, functionName);
        if (topic != null) {
            inputTopicToWrite = topic;
        } else if (functionMetaData.getFunctionDetails().getSource().getTopicsToSerDeClassNameMap().size() == 1) {
            inputTopicToWrite = (String)functionMetaData.getFunctionDetails().getSource().getTopicsToSerDeClassNameMap().keySet().iterator().next();
        } else {
            return Response.status((Response.Status)Response.Status.BAD_REQUEST).build();
        }
        if (functionMetaData.getFunctionDetails().getSource().getTopicsToSerDeClassNameMap() == null || !functionMetaData.getFunctionDetails().getSource().getTopicsToSerDeClassNameMap().containsKey(inputTopicToWrite)) {
            return Response.status((Response.Status)Response.Status.BAD_REQUEST).build();
        }
        String outputTopic = functionMetaData.getFunctionDetails().getSink().getTopic();
        Reader reader = null;
        Producer producer = null;
        try {
            Message msg;
            byte[] targetArray;
            if (outputTopic != null && !outputTopic.isEmpty()) {
                reader = this.worker().getClient().newReader().topic(outputTopic).startMessageId(MessageId.latest).create();
            }
            producer = this.worker().getClient().newProducer().topic(inputTopicToWrite).create();
            if (uploadedInputStream != null) {
                targetArray = new byte[uploadedInputStream.available()];
                uploadedInputStream.read(targetArray);
            } else {
                targetArray = input.getBytes();
            }
            MessageId msgId = producer.send((Object)targetArray);
            if (reader == null) {
                Response response = Response.status((Response.Status)Response.Status.OK).build();
                return response;
            }
            long curTime = System.currentTimeMillis();
            long maxTime = curTime + 1000L;
            while (curTime < maxTime && (msg = reader.readNext(10000, TimeUnit.MILLISECONDS)) != null) {
                MessageId newMsgId;
                if (msg.getProperties().containsKey("__pfn_input_msg_id__") && msg.getProperties().containsKey("__pfn_input_topic__") && msgId.equals(newMsgId = MessageId.fromByteArray((byte[])Base64.getDecoder().decode((String)msg.getProperties().get("__pfn_input_msg_id__")))) && msg.getProperties().get("__pfn_input_topic__").equals(inputTopicToWrite)) {
                    Response response = Response.status((Response.Status)Response.Status.OK).entity((Object)msg.getData()).build();
                    return response;
                }
                curTime = System.currentTimeMillis();
            }
            Response response = Response.status((Response.Status)Response.Status.REQUEST_TIMEOUT).build();
            return response;
        }
        catch (Exception e) {
            Response response = Response.status((Response.Status)Response.Status.INTERNAL_SERVER_ERROR).build();
            return response;
        }
        finally {
            if (reader != null) {
                reader.closeAsync();
            }
            if (producer != null) {
                producer.closeAsync();
            }
        }
    }

    public Response uploadFunction(InputStream uploadedInputStream, String path) {
        try {
            if (uploadedInputStream == null || path == null) {
                throw new IllegalArgumentException("Function Package is not provided " + path);
            }
        }
        catch (IllegalArgumentException e) {
            log.error("Invalid upload function request @ /{}", (Object)path, (Object)e);
            return Response.status((Response.Status)Response.Status.BAD_REQUEST).type("application/json").entity((Object)new ErrorData(e.getMessage())).build();
        }
        try {
            log.info("Uploading function package to {}", (Object)path);
            Utils.uploadToBookeeper(this.worker().getDlogNamespace(), uploadedInputStream, Codec.encode((String)path));
        }
        catch (IOException e) {
            log.error("Error uploading file {}", (Object)path, (Object)e);
            return Response.serverError().type("application/json").entity((Object)new ErrorData(e.getMessage())).build();
        }
        return Response.status((Response.Status)Response.Status.OK).build();
    }

    public Response downloadFunction(final String path) {
        return Response.status((Response.Status)Response.Status.OK).entity((Object)new StreamingOutput(){

            public void write(OutputStream output) throws IOException {
                if (path.startsWith(org.apache.pulsar.functions.utils.Utils.HTTP)) {
                    URL url = new URL(path);
                    IOUtils.copy((InputStream)url.openStream(), (OutputStream)output);
                } else if (path.startsWith(org.apache.pulsar.functions.utils.Utils.FILE)) {
                    URL url = new URL(path);
                    try {
                        File file = new File(url.toURI());
                        IOUtils.copy((InputStream)new FileInputStream(file), (OutputStream)output);
                    }
                    catch (URISyntaxException e) {
                        throw new IllegalArgumentException("invalid file url path: " + path);
                    }
                } else {
                    Utils.downloadFromBookkeeper(FunctionsImpl.this.worker().getDlogNamespace(), output, Codec.encode((String)path));
                }
            }
        }).build();
    }

    private void validateListFunctionRequestParams(String tenant, String namespace) throws IllegalArgumentException {
        if (tenant == null) {
            throw new IllegalArgumentException("Tenant is not provided");
        }
        if (namespace == null) {
            throw new IllegalArgumentException("Namespace is not provided");
        }
    }

    private void validateGetFunctionInstanceRequestParams(String tenant, String namespace, String functionName, String instanceId) throws IllegalArgumentException {
        this.validateGetFunctionRequestParams(tenant, namespace, functionName);
        if (instanceId == null) {
            throw new IllegalArgumentException("Function Instance Id is not provided");
        }
    }

    private void validateGetFunctionRequestParams(String tenant, String namespace, String functionName) throws IllegalArgumentException {
        if (tenant == null) {
            throw new IllegalArgumentException("Tenant is not provided");
        }
        if (namespace == null) {
            throw new IllegalArgumentException("Namespace is not provided");
        }
        if (functionName == null) {
            throw new IllegalArgumentException("Function Name is not provided");
        }
    }

    private void validateDeregisterRequestParams(String tenant, String namespace, String functionName) throws IllegalArgumentException {
        if (tenant == null) {
            throw new IllegalArgumentException("Tenant is not provided");
        }
        if (namespace == null) {
            throw new IllegalArgumentException("Namespace is not provided");
        }
        if (functionName == null) {
            throw new IllegalArgumentException("Function Name is not provided");
        }
    }

    private Function.FunctionDetails validateUpdateRequestParamsWithPkgUrl(String tenant, String namespace, String functionName, String functionPkgUrl, String functionDetailsJson) throws IllegalArgumentException, IOException, URISyntaxException {
        if (!org.apache.pulsar.functions.utils.Utils.isFunctionPackageUrlSupported((String)functionPkgUrl)) {
            throw new IllegalArgumentException("Function Package url is not valid. supported url (http/https/file)");
        }
        Utils.validateFileUrl(functionPkgUrl, this.workerServiceSupplier.get().getWorkerConfig().getDownloadDirectory());
        File jarWithFileUrl = functionPkgUrl.startsWith(org.apache.pulsar.functions.utils.Utils.FILE) ? new File(new URL(functionPkgUrl).toURI()) : null;
        Function.FunctionDetails functionDetails = this.validateUpdateRequestParams(tenant, namespace, functionName, functionDetailsJson, jarWithFileUrl);
        return functionDetails;
    }

    private Function.FunctionDetails validateUpdateRequestParams(String tenant, String namespace, String functionName, InputStream uploadedInputStream, FormDataContentDisposition fileDetail, String functionDetailsJson) throws IllegalArgumentException {
        Function.FunctionDetails functionDetails = this.validateUpdateRequestParams(tenant, namespace, functionName, functionDetailsJson, null);
        if (!(this.isFunctionCodeBuiltin(functionDetails) || uploadedInputStream != null && fileDetail != null)) {
            throw new IllegalArgumentException("Function Package is not provided");
        }
        return functionDetails;
    }

    private boolean isFunctionCodeBuiltin(Function.FunctionDetails functionDetails) {
        Function.SinkSpec sinkSpec;
        Function.SourceSpec sourceSpec;
        if (functionDetails.hasSource() && !StringUtils.isEmpty((CharSequence)(sourceSpec = functionDetails.getSource()).getBuiltin())) {
            return true;
        }
        return functionDetails.hasSink() && !StringUtils.isEmpty((CharSequence)(sinkSpec = functionDetails.getSink()).getBuiltin());
    }

    private String getFunctionCodeBuiltin(Function.FunctionDetails functionDetails) {
        Function.SinkSpec sinkSpec;
        Function.SourceSpec sourceSpec;
        if (functionDetails.hasSource() && !StringUtils.isEmpty((CharSequence)(sourceSpec = functionDetails.getSource()).getBuiltin())) {
            return sourceSpec.getBuiltin();
        }
        if (functionDetails.hasSink() && !StringUtils.isEmpty((CharSequence)(sinkSpec = functionDetails.getSink()).getBuiltin())) {
            return sinkSpec.getBuiltin();
        }
        return null;
    }

    private Function.FunctionDetails validateUpdateRequestParams(String tenant, String namespace, String functionName, String functionDetailsJson, File jarWithFileUrl) throws IllegalArgumentException {
        if (tenant == null) {
            throw new IllegalArgumentException("Tenant is not provided");
        }
        if (namespace == null) {
            throw new IllegalArgumentException("Namespace is not provided");
        }
        if (functionName == null) {
            throw new IllegalArgumentException("Function Name is not provided");
        }
        if (functionDetailsJson == null) {
            throw new IllegalArgumentException("FunctionDetails is not provided");
        }
        try {
            Function.FunctionDetails.Builder functionDetailsBuilder = Function.FunctionDetails.newBuilder();
            org.apache.pulsar.functions.utils.Utils.mergeJson((String)functionDetailsJson, (AbstractMessage.Builder)functionDetailsBuilder);
            this.validateFunctionClassTypes(jarWithFileUrl, functionDetailsBuilder);
            Function.FunctionDetails functionDetails = functionDetailsBuilder.build();
            LinkedList<String> missingFields = new LinkedList<String>();
            if (functionDetails.getTenant() == null || functionDetails.getTenant().isEmpty()) {
                missingFields.add("Tenant");
            }
            if (functionDetails.getNamespace() == null || functionDetails.getNamespace().isEmpty()) {
                missingFields.add("Namespace");
            }
            if (functionDetails.getName() == null || functionDetails.getName().isEmpty()) {
                missingFields.add("Name");
            }
            if (functionDetails.getClassName() == null || functionDetails.getClassName().isEmpty()) {
                missingFields.add("ClassName");
            }
            if (!functionDetails.getSource().isInitialized()) {
                missingFields.add("Source");
            }
            if (!functionDetails.getSink().isInitialized()) {
                missingFields.add("Sink");
            }
            if (!missingFields.isEmpty()) {
                String errorMessage = StringUtils.join(missingFields, (String)",");
                throw new IllegalArgumentException(errorMessage + " is not provided");
            }
            if (functionDetails.getParallelism() <= 0) {
                throw new IllegalArgumentException("Parallelism needs to be set to a positive number");
            }
            return functionDetails;
        }
        catch (IllegalArgumentException ex) {
            throw ex;
        }
        catch (Exception ex) {
            throw new IllegalArgumentException("Invalid FunctionDetails");
        }
    }

    /*
     * Unable to fully structure code
     */
    private void validateFunctionClassTypes(File jarFile, Function.FunctionDetails.Builder functionDetailsBuilder) throws MalformedURLException {
        if (jarFile == null) {
            return;
        }
        if (StringUtils.isBlank((CharSequence)functionDetailsBuilder.getClassName())) {
            throw new IllegalArgumentException("function class-name can't be empty");
        }
        urls = new URL[]{jarFile.toURI().toURL()};
        classLoader = FunctionClassLoaders.create((URL[])urls, (ClassLoader)FunctionClassLoaders.class.getClassLoader());
        functionObject = Reflections.createInstance((String)functionDetailsBuilder.getClassName(), (ClassLoader)classLoader);
        typeArgs = org.apache.pulsar.functions.utils.Utils.getFunctionTypes((Object)functionObject, (boolean)false);
        if (!(functionObject instanceof org.apache.pulsar.functions.api.Function) && !(functionObject instanceof Function)) {
            throw new RuntimeException("User class must either be Function or java.util.Function");
        }
        if (functionDetailsBuilder.hasSource() && functionDetailsBuilder.getSource() != null && StringUtils.isNotBlank((CharSequence)functionDetailsBuilder.getSource().getClassName())) {
            try {
                sourceClassName = functionDetailsBuilder.getSource().getClassName();
                argClassName = this.getTypeArg(sourceClassName, Source.class, classLoader).getName();
                functionDetailsBuilder.setSource(functionDetailsBuilder.getSourceBuilder().setTypeClassName(argClassName));
                if (functionDetailsBuilder.hasSink() && !StringUtils.isBlank((CharSequence)functionDetailsBuilder.getSink().getClassName())) ** GOTO lbl29
                functionDetailsBuilder.setSink(functionDetailsBuilder.getSinkBuilder().setTypeClassName(argClassName));
            }
            catch (IllegalArgumentException ie) {
                throw ie;
            }
            catch (Exception e) {
                FunctionsImpl.log.error("Failed to validate source class", (Throwable)e);
                throw new IllegalArgumentException("Failed to validate source class-name", e);
            }
        } else if (StringUtils.isBlank((CharSequence)functionDetailsBuilder.getSourceBuilder().getTypeClassName())) {
            functionDetailsBuilder.setSource(functionDetailsBuilder.getSourceBuilder().setTypeClassName(typeArgs[0].getName()));
        }
lbl29:
        // 5 sources

        if (functionDetailsBuilder.hasSink() && functionDetailsBuilder.getSink() != null && StringUtils.isNotBlank((CharSequence)functionDetailsBuilder.getSink().getClassName())) {
            try {
                sinkClassName = functionDetailsBuilder.getSink().getClassName();
                argClassName = this.getTypeArg(sinkClassName, Sink.class, classLoader).getName();
                functionDetailsBuilder.setSink(functionDetailsBuilder.getSinkBuilder().setTypeClassName(argClassName));
                if (functionDetailsBuilder.hasSource() && !StringUtils.isBlank((CharSequence)functionDetailsBuilder.getSource().getClassName())) ** GOTO lbl47
                functionDetailsBuilder.setSource(functionDetailsBuilder.getSourceBuilder().setTypeClassName(argClassName));
            }
            catch (IllegalArgumentException ie) {
                throw ie;
            }
            catch (Exception e) {
                FunctionsImpl.log.error("Failed to validate sink class", (Throwable)e);
                throw new IllegalArgumentException("Failed to validate sink class-name", e);
            }
        } else if (StringUtils.isBlank((CharSequence)functionDetailsBuilder.getSinkBuilder().getTypeClassName())) {
            functionDetailsBuilder.setSink(functionDetailsBuilder.getSinkBuilder().setTypeClassName(typeArgs[1].getName()));
        }
lbl47:
        // 5 sources

    }

    private Class<?> getTypeArg(String className, Class<?> funClass, URLClassLoader classLoader) throws ClassNotFoundException {
        Class<?> loadedClass = classLoader.loadClass(className);
        if (!funClass.isAssignableFrom(loadedClass)) {
            throw new IllegalArgumentException(String.format("class %s is not type of %s", className, funClass.getName()));
        }
        return TypeResolver.resolveRawArgument(funClass, loadedClass);
    }

    private void validateTriggerRequestParams(String tenant, String namespace, String functionName, String topic, String input, InputStream uploadedInputStream) {
        if (tenant == null) {
            throw new IllegalArgumentException("Tenant is not provided");
        }
        if (namespace == null) {
            throw new IllegalArgumentException("Namespace is not provided");
        }
        if (functionName == null) {
            throw new IllegalArgumentException("Function Name is not provided");
        }
        if (uploadedInputStream == null && input == null) {
            throw new IllegalArgumentException("Trigger Data is not provided");
        }
    }

    private Response getUnavailableResponse() {
        return Response.status((Response.Status)Response.Status.SERVICE_UNAVAILABLE).type("application/json").entity((Object)new ErrorData("Function worker service is not done initializing. Please try again in a little while.")).build();
    }

    public static String createPackagePath(String tenant, String namespace, String functionName, String fileName) {
        return String.format("%s/%s/%s/%s", tenant, namespace, Codec.encode((String)functionName), Utils.getUniquePackageName(Codec.encode((String)fileName)));
    }

    private boolean isAuthorizedRole(String tenant, String clientRole) throws PulsarAdminException {
        if (this.worker().getWorkerConfig().isAuthorizationEnabled()) {
            if (clientRole != null && this.worker().getWorkerConfig().getSuperUserRoles().contains(clientRole)) {
                return true;
            }
            TenantInfo tenantInfo = this.worker().getAdmin().tenants().getTenantInfo(tenant);
            return clientRole != null && (tenantInfo.getAdminRoles() == null || tenantInfo.getAdminRoles().isEmpty() || tenantInfo.getAdminRoles().contains(clientRole));
        }
        return true;
    }
}

