/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.functions.worker.rest.api;

import com.google.common.base.Preconditions;
import com.google.gson.Gson;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Base64;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import javax.ws.rs.Consumes;
import javax.ws.rs.DELETE;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.PUT;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.StreamingOutput;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.common.policies.data.ErrorData;
import org.apache.pulsar.common.util.Codec;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.InstanceCommunication;
import org.apache.pulsar.functions.shaded.com.google.protobuf.AbstractMessage;
import org.apache.pulsar.functions.shaded.com.google.protobuf.MessageOrBuilder;
import org.apache.pulsar.functions.worker.FunctionMetaDataManager;
import org.apache.pulsar.functions.worker.FunctionRuntimeManager;
import org.apache.pulsar.functions.worker.MembershipManager;
import org.apache.pulsar.functions.worker.Utils;
import org.apache.pulsar.functions.worker.WorkerService;
import org.apache.pulsar.functions.worker.request.RequestResult;
import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
import org.glassfish.jersey.media.multipart.FormDataParam;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FunctionsImpl {
    private static final Logger log = LoggerFactory.getLogger(FunctionsImpl.class);
    private final Supplier<WorkerService> workerServiceSupplier;

    public FunctionsImpl(Supplier<WorkerService> workerServiceSupplier) {
        this.workerServiceSupplier = workerServiceSupplier;
    }

    private WorkerService worker() {
        try {
            return (WorkerService)Preconditions.checkNotNull((Object)this.workerServiceSupplier.get());
        }
        catch (Throwable t) {
            log.info("Failed to get worker service", t);
            throw t;
        }
    }

    private boolean isWorkerServiceAvailable() {
        WorkerService workerService = this.workerServiceSupplier.get();
        if (workerService == null) {
            return false;
        }
        return workerService.isInitialized();
    }

    @POST
    @Path(value="/{tenant}/{namespace}/{functionName}")
    @Consumes(value={"multipart/form-data"})
    public Response registerFunction(@PathParam(value="tenant") String tenant, @PathParam(value="namespace") String namespace, @PathParam(value="functionName") String functionName, @FormDataParam(value="data") InputStream uploadedInputStream, @FormDataParam(value="data") FormDataContentDisposition fileDetail, @FormDataParam(value="functionDetails") String functionDetailsJson) {
        Function.FunctionDetails functionDetails;
        if (!this.isWorkerServiceAvailable()) {
            return this.getUnavailableResponse();
        }
        try {
            functionDetails = this.validateUpdateRequestParams(tenant, namespace, functionName, uploadedInputStream, fileDetail, functionDetailsJson);
        }
        catch (IllegalArgumentException e) {
            log.error("Invalid register function request @ /{}/{}/{}", new Object[]{tenant, namespace, functionName, e});
            return Response.status((Response.Status)Response.Status.BAD_REQUEST).type("application/json").entity((Object)new ErrorData(e.getMessage())).build();
        }
        FunctionMetaDataManager functionMetaDataManager = this.worker().getFunctionMetaDataManager();
        if (functionMetaDataManager.containsFunction(tenant, namespace, functionName)) {
            log.error("Function {}/{}/{} already exists", new Object[]{tenant, namespace, functionName});
            return Response.status((Response.Status)Response.Status.BAD_REQUEST).type("application/json").entity((Object)new ErrorData(String.format("Function %s already exists", functionName))).build();
        }
        Function.FunctionMetaData.Builder functionMetaDataBuilder = Function.FunctionMetaData.newBuilder().setFunctionDetails(functionDetails).setCreateTime(System.currentTimeMillis()).setVersion(0L);
        Function.PackageLocationMetaData.Builder packageLocationMetaDataBuilder = Function.PackageLocationMetaData.newBuilder().setPackagePath(FunctionsImpl.createPackagePath(tenant, namespace, functionName, fileDetail.getFileName()));
        functionMetaDataBuilder.setPackageLocation(packageLocationMetaDataBuilder);
        return this.updateRequest(functionMetaDataBuilder.build(), uploadedInputStream);
    }

    @PUT
    @Path(value="/{tenant}/{namespace}/{functionName}")
    @Consumes(value={"multipart/form-data"})
    public Response updateFunction(@PathParam(value="tenant") String tenant, @PathParam(value="namespace") String namespace, @PathParam(value="functionName") String functionName, @FormDataParam(value="data") InputStream uploadedInputStream, @FormDataParam(value="data") FormDataContentDisposition fileDetail, @FormDataParam(value="functionDetails") String functionDetailsJson) {
        Function.FunctionDetails functionDetails;
        if (!this.isWorkerServiceAvailable()) {
            return this.getUnavailableResponse();
        }
        try {
            functionDetails = this.validateUpdateRequestParams(tenant, namespace, functionName, uploadedInputStream, fileDetail, functionDetailsJson);
        }
        catch (IllegalArgumentException e) {
            log.error("Invalid update function request @ /{}/{}/{}", new Object[]{tenant, namespace, functionName, e});
            return Response.status((Response.Status)Response.Status.BAD_REQUEST).type("application/json").entity((Object)new ErrorData(e.getMessage())).build();
        }
        FunctionMetaDataManager functionMetaDataManager = this.worker().getFunctionMetaDataManager();
        if (!functionMetaDataManager.containsFunction(tenant, namespace, functionName)) {
            return Response.status((Response.Status)Response.Status.BAD_REQUEST).type("application/json").entity((Object)new ErrorData(String.format("Function %s doesn't exist", functionName))).build();
        }
        Function.FunctionMetaData.Builder functionMetaDataBuilder = Function.FunctionMetaData.newBuilder().setFunctionDetails(functionDetails).setCreateTime(System.currentTimeMillis()).setVersion(0L);
        Function.PackageLocationMetaData.Builder packageLocationMetaDataBuilder = Function.PackageLocationMetaData.newBuilder().setPackagePath(FunctionsImpl.createPackagePath(tenant, namespace, functionName, fileDetail.getFileName()));
        functionMetaDataBuilder.setPackageLocation(packageLocationMetaDataBuilder);
        return this.updateRequest(functionMetaDataBuilder.build(), uploadedInputStream);
    }

    @DELETE
    @Path(value="/{tenant}/{namespace}/{functionName}")
    public Response deregisterFunction(@PathParam(value="tenant") String tenant, @PathParam(value="namespace") String namespace, @PathParam(value="functionName") String functionName) {
        if (!this.isWorkerServiceAvailable()) {
            return this.getUnavailableResponse();
        }
        try {
            this.validateDeregisterRequestParams(tenant, namespace, functionName);
        }
        catch (IllegalArgumentException e) {
            log.error("Invalid deregister function request @ /{}/{}/{}", new Object[]{tenant, namespace, functionName, e});
            return Response.status((Response.Status)Response.Status.BAD_REQUEST).type("application/json").entity((Object)new ErrorData(e.getMessage())).build();
        }
        FunctionMetaDataManager functionMetaDataManager = this.worker().getFunctionMetaDataManager();
        if (!functionMetaDataManager.containsFunction(tenant, namespace, functionName)) {
            log.error("Function to deregister does not exist @ /{}/{}/{}", new Object[]{tenant, namespace, functionName});
            return Response.status((Response.Status)Response.Status.NOT_FOUND).type("application/json").entity((Object)new ErrorData(String.format("Function %s doesn't exist", functionName))).build();
        }
        CompletableFuture<RequestResult> completableFuture = functionMetaDataManager.deregisterFunction(tenant, namespace, functionName);
        RequestResult requestResult = null;
        try {
            requestResult = completableFuture.get();
            if (!requestResult.isSuccess()) {
                return Response.status((Response.Status)Response.Status.BAD_REQUEST).type("application/json").entity((Object)new ErrorData(requestResult.getMessage())).build();
            }
        }
        catch (ExecutionException e) {
            log.error("Execution Exception while deregistering function @ /{}/{}/{}", new Object[]{tenant, namespace, functionName, e});
            return Response.serverError().type("application/json").entity((Object)new ErrorData(e.getCause().getMessage())).build();
        }
        catch (InterruptedException e) {
            log.error("Interrupted Exception while deregistering function @ /{}/{}/{}", new Object[]{tenant, namespace, functionName, e});
            return Response.status((Response.Status)Response.Status.REQUEST_TIMEOUT).type("application/json").build();
        }
        return Response.status((Response.Status)Response.Status.OK).entity((Object)requestResult.toJson()).build();
    }

    @GET
    @Path(value="/{tenant}/{namespace}/{functionName}")
    public Response getFunctionInfo(@PathParam(value="tenant") String tenant, @PathParam(value="namespace") String namespace, @PathParam(value="functionName") String functionName) throws IOException {
        if (!this.isWorkerServiceAvailable()) {
            return this.getUnavailableResponse();
        }
        try {
            this.validateGetFunctionRequestParams(tenant, namespace, functionName);
        }
        catch (IllegalArgumentException e) {
            log.error("Invalid getFunction request @ /{}/{}/{}", new Object[]{tenant, namespace, functionName, e});
            return Response.status((Response.Status)Response.Status.BAD_REQUEST).type("application/json").entity((Object)new ErrorData(e.getMessage())).build();
        }
        FunctionMetaDataManager functionMetaDataManager = this.worker().getFunctionMetaDataManager();
        if (!functionMetaDataManager.containsFunction(tenant, namespace, functionName)) {
            log.error("Function in getFunction does not exist @ /{}/{}/{}", new Object[]{tenant, namespace, functionName});
            return Response.status((Response.Status)Response.Status.NOT_FOUND).type("application/json").entity((Object)new ErrorData(String.format("Function %s doesn't exist", functionName))).build();
        }
        Function.FunctionMetaData functionMetaData = functionMetaDataManager.getFunctionMetaData(tenant, namespace, functionName);
        String functionDetailsJson = org.apache.pulsar.functions.utils.Utils.printJson((MessageOrBuilder)functionMetaData.getFunctionDetails());
        return Response.status((Response.Status)Response.Status.OK).entity((Object)functionDetailsJson).build();
    }

    @GET
    @Path(value="/{tenant}/{namespace}/{functionName}/{instanceId}/status")
    public Response getFunctionInstanceStatus(@PathParam(value="tenant") String tenant, @PathParam(value="namespace") String namespace, @PathParam(value="functionName") String functionName, @PathParam(value="instanceId") String instanceId) throws IOException {
        if (!this.isWorkerServiceAvailable()) {
            return this.getUnavailableResponse();
        }
        try {
            this.validateGetFunctionInstanceRequestParams(tenant, namespace, functionName, instanceId);
        }
        catch (IllegalArgumentException e) {
            log.error("Invalid getFunctionStatus request @ /{}/{}/{}", new Object[]{tenant, namespace, functionName, e});
            return Response.status((Response.Status)Response.Status.BAD_REQUEST).type("application/json").entity((Object)new ErrorData(e.getMessage())).build();
        }
        FunctionMetaDataManager functionMetaDataManager = this.worker().getFunctionMetaDataManager();
        if (!functionMetaDataManager.containsFunction(tenant, namespace, functionName)) {
            log.error("Function in getFunctionStatus does not exist @ /{}/{}/{}", new Object[]{tenant, namespace, functionName});
            return Response.status((Response.Status)Response.Status.NOT_FOUND).type("application/json").entity((Object)new ErrorData(String.format("Function %s doesn't exist", functionName))).build();
        }
        FunctionRuntimeManager functionRuntimeManager = this.worker().getFunctionRuntimeManager();
        InstanceCommunication.FunctionStatus functionStatus = null;
        try {
            functionStatus = functionRuntimeManager.getFunctionInstanceStatus(tenant, namespace, functionName, Integer.parseInt(instanceId));
        }
        catch (Exception e) {
            log.error("Got Exception Getting Status", (Throwable)e);
            InstanceCommunication.FunctionStatus.Builder functionStatusBuilder = InstanceCommunication.FunctionStatus.newBuilder();
            functionStatusBuilder.setRunning(false);
            String functionDetailsJson = org.apache.pulsar.functions.utils.Utils.printJson((MessageOrBuilder)functionStatusBuilder.build());
            return Response.status((Response.Status)Response.Status.OK).entity((Object)functionDetailsJson).build();
        }
        String jsonResponse = org.apache.pulsar.functions.utils.Utils.printJson((MessageOrBuilder)functionStatus);
        return Response.status((Response.Status)Response.Status.OK).entity((Object)jsonResponse).build();
    }

    @GET
    @Path(value="/{tenant}/{namespace}/{functionName}/status")
    public Response getFunctionStatus(@PathParam(value="tenant") String tenant, @PathParam(value="namespace") String namespace, @PathParam(value="functionName") String functionName) throws IOException {
        if (!this.isWorkerServiceAvailable()) {
            return this.getUnavailableResponse();
        }
        try {
            this.validateGetFunctionRequestParams(tenant, namespace, functionName);
        }
        catch (IllegalArgumentException e) {
            log.error("Invalid getFunctionStatus request @ /{}/{}/{}", new Object[]{tenant, namespace, functionName, e});
            return Response.status((Response.Status)Response.Status.BAD_REQUEST).type("application/json").entity((Object)new ErrorData(e.getMessage())).build();
        }
        FunctionMetaDataManager functionMetaDataManager = this.worker().getFunctionMetaDataManager();
        if (!functionMetaDataManager.containsFunction(tenant, namespace, functionName)) {
            log.error("Function in getFunctionStatus does not exist @ /{}/{}/{}", new Object[]{tenant, namespace, functionName});
            return Response.status((Response.Status)Response.Status.NOT_FOUND).type("application/json").entity((Object)new ErrorData(String.format("Function %s doesn't exist", functionName))).build();
        }
        FunctionRuntimeManager functionRuntimeManager = this.worker().getFunctionRuntimeManager();
        InstanceCommunication.FunctionStatusList functionStatusList = null;
        try {
            functionStatusList = functionRuntimeManager.getAllFunctionStatus(tenant, namespace, functionName);
        }
        catch (Exception e) {
            log.error("Got Exception Getting Status", (Throwable)e);
            InstanceCommunication.FunctionStatus.Builder functionStatusBuilder = InstanceCommunication.FunctionStatus.newBuilder();
            functionStatusBuilder.setRunning(false);
            String functionDetailsJson = org.apache.pulsar.functions.utils.Utils.printJson((MessageOrBuilder)functionStatusBuilder.build());
            return Response.status((Response.Status)Response.Status.OK).entity((Object)functionDetailsJson).build();
        }
        String jsonResponse = org.apache.pulsar.functions.utils.Utils.printJson((MessageOrBuilder)functionStatusList);
        return Response.status((Response.Status)Response.Status.OK).entity((Object)jsonResponse).build();
    }

    @GET
    @Path(value="/{tenant}/{namespace}")
    public Response listFunctions(@PathParam(value="tenant") String tenant, @PathParam(value="namespace") String namespace) {
        if (!this.isWorkerServiceAvailable()) {
            return this.getUnavailableResponse();
        }
        try {
            this.validateListFunctionRequestParams(tenant, namespace);
        }
        catch (IllegalArgumentException e) {
            log.error("Invalid listFunctions request @ /{}/{}", new Object[]{tenant, namespace, e});
            return Response.status((Response.Status)Response.Status.BAD_REQUEST).type("application/json").entity((Object)new ErrorData(e.getMessage())).build();
        }
        FunctionMetaDataManager functionMetaDataManager = this.worker().getFunctionMetaDataManager();
        Collection<String> functionStateList = functionMetaDataManager.listFunctions(tenant, namespace);
        return Response.status((Response.Status)Response.Status.OK).entity((Object)new Gson().toJson((Object)functionStateList.toArray())).build();
    }

    private Response updateRequest(Function.FunctionMetaData functionMetaData, InputStream uploadedInputStream) {
        try {
            log.info("Uploading function package to {}", (Object)functionMetaData.getPackageLocation());
            Utils.uploadToBookeeper(this.worker().getDlogNamespace(), uploadedInputStream, functionMetaData.getPackageLocation().getPackagePath());
        }
        catch (IOException e) {
            log.error("Error uploading file {}", (Object)functionMetaData.getPackageLocation(), (Object)e);
            return Response.serverError().type("application/json").entity((Object)new ErrorData(e.getMessage())).build();
        }
        FunctionMetaDataManager functionMetaDataManager = this.worker().getFunctionMetaDataManager();
        CompletableFuture<RequestResult> completableFuture = functionMetaDataManager.updateFunction(functionMetaData);
        RequestResult requestResult = null;
        try {
            requestResult = completableFuture.get();
            if (!requestResult.isSuccess()) {
                return Response.status((Response.Status)Response.Status.BAD_REQUEST).type("application/json").entity((Object)new ErrorData(requestResult.getMessage())).build();
            }
        }
        catch (ExecutionException e) {
            return Response.serverError().type("application/json").entity((Object)new ErrorData(e.getCause().getMessage())).build();
        }
        catch (InterruptedException e) {
            return Response.status((Response.Status)Response.Status.REQUEST_TIMEOUT).type("application/json").entity((Object)new ErrorData(e.getCause().getMessage())).build();
        }
        return Response.status((Response.Status)Response.Status.OK).build();
    }

    @GET
    @Path(value="/cluster")
    public Response getCluster() {
        if (!this.isWorkerServiceAvailable()) {
            return this.getUnavailableResponse();
        }
        MembershipManager membershipManager = this.worker().getMembershipManager();
        List<MembershipManager.WorkerInfo> members = membershipManager.getCurrentMembership();
        return Response.status((Response.Status)Response.Status.OK).entity((Object)new Gson().toJson(members)).build();
    }

    @GET
    @Path(value="/assignments")
    public Response getAssignments() {
        if (!this.isWorkerServiceAvailable()) {
            return this.getUnavailableResponse();
        }
        FunctionRuntimeManager functionRuntimeManager = this.worker().getFunctionRuntimeManager();
        Map<String, Map<String, Function.Assignment>> assignments = functionRuntimeManager.getCurrentAssignments();
        HashMap<String, Set<String>> ret = new HashMap<String, Set<String>>();
        for (Map.Entry<String, Map<String, Function.Assignment>> entry : assignments.entrySet()) {
            ret.put(entry.getKey(), entry.getValue().keySet());
        }
        return Response.status((Response.Status)Response.Status.OK).entity((Object)new Gson().toJson(ret)).build();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @POST
    @Path(value="/{tenant}/{namespace}/{functionName}/trigger")
    @Consumes(value={"multipart/form-data"})
    public Response triggerFunction(@PathParam(value="tenant") String tenant, @PathParam(value="namespace") String namespace, @PathParam(value="name") String functionName, @FormDataParam(value="data") String input, @FormDataParam(value="dataStream") InputStream uploadedInputStream, @FormDataParam(value="topic") String topic) {
        String inputTopicToWrite;
        if (!this.isWorkerServiceAvailable()) {
            return this.getUnavailableResponse();
        }
        try {
            this.validateTriggerRequestParams(tenant, namespace, functionName, topic, input, uploadedInputStream);
        }
        catch (IllegalArgumentException e) {
            log.error("Invalid trigger function request @ /{}/{}/{}", new Object[]{tenant, namespace, functionName, e});
            return Response.status((Response.Status)Response.Status.BAD_REQUEST).type("application/json").entity((Object)new ErrorData(e.getMessage())).build();
        }
        FunctionMetaDataManager functionMetaDataManager = this.worker().getFunctionMetaDataManager();
        if (!functionMetaDataManager.containsFunction(tenant, namespace, functionName)) {
            log.error("Function in getFunction does not exist @ /{}/{}/{}", new Object[]{tenant, namespace, functionName});
            return Response.status((Response.Status)Response.Status.NOT_FOUND).type("application/json").entity((Object)new ErrorData(String.format("Function %s doesn't exist", functionName))).build();
        }
        Function.FunctionMetaData functionMetaData = functionMetaDataManager.getFunctionMetaData(tenant, namespace, functionName);
        if (topic != null) {
            inputTopicToWrite = topic;
        } else if (functionMetaData.getFunctionDetails().getSource().getTopicsToSerDeClassNameMap().size() == 1) {
            inputTopicToWrite = (String)functionMetaData.getFunctionDetails().getSource().getTopicsToSerDeClassNameMap().keySet().iterator().next();
        } else {
            return Response.status((Response.Status)Response.Status.BAD_REQUEST).build();
        }
        if (functionMetaData.getFunctionDetails().getSource().getTopicsToSerDeClassNameMap() == null || !functionMetaData.getFunctionDetails().getSource().getTopicsToSerDeClassNameMap().containsKey(inputTopicToWrite)) {
            return Response.status((Response.Status)Response.Status.BAD_REQUEST).build();
        }
        String outputTopic = functionMetaData.getFunctionDetails().getSink().getTopic();
        Reader reader = null;
        Producer producer = null;
        try {
            Message msg;
            byte[] targetArray;
            if (outputTopic != null && !outputTopic.isEmpty()) {
                reader = this.worker().getClient().newReader().topic(outputTopic).startMessageId(MessageId.latest).create();
            }
            producer = this.worker().getClient().newProducer().topic(inputTopicToWrite).create();
            if (uploadedInputStream != null) {
                targetArray = new byte[uploadedInputStream.available()];
                uploadedInputStream.read(targetArray);
            } else {
                targetArray = input.getBytes();
            }
            MessageId msgId = producer.send((Object)targetArray);
            if (reader == null) {
                Response response = Response.status((Response.Status)Response.Status.OK).build();
                return response;
            }
            long curTime = System.currentTimeMillis();
            long maxTime = curTime + 1000L;
            while (curTime < maxTime && (msg = reader.readNext(10000, TimeUnit.MILLISECONDS)) != null) {
                MessageId newMsgId;
                if (msg.getProperties().containsKey("__pfn_input_msg_id__") && msg.getProperties().containsKey("__pfn_input_topic__") && msgId.equals(newMsgId = MessageId.fromByteArray((byte[])Base64.getDecoder().decode((String)msg.getProperties().get("__pfn_input_msg_id__")))) && msg.getProperties().get("__pfn_input_topic__").equals(inputTopicToWrite)) {
                    Response response = Response.status((Response.Status)Response.Status.OK).entity((Object)msg.getData()).build();
                    return response;
                }
                curTime = System.currentTimeMillis();
            }
            Response response = Response.status((Response.Status)Response.Status.REQUEST_TIMEOUT).build();
            return response;
        }
        catch (Exception e) {
            Response response = Response.status((Response.Status)Response.Status.INTERNAL_SERVER_ERROR).build();
            return response;
        }
        finally {
            if (reader != null) {
                reader.closeAsync();
            }
            if (producer != null) {
                producer.closeAsync();
            }
        }
    }

    @POST
    @Path(value="/upload")
    @Consumes(value={"multipart/form-data"})
    public Response uploadFunction(@FormDataParam(value="data") InputStream uploadedInputStream, @FormDataParam(value="path") String path) {
        try {
            if (uploadedInputStream == null || path == null) {
                throw new IllegalArgumentException("Function Package is not provided " + path);
            }
        }
        catch (IllegalArgumentException e) {
            log.error("Invalid upload function request @ /{}", (Object)path, (Object)e);
            return Response.status((Response.Status)Response.Status.BAD_REQUEST).type("application/json").entity((Object)new ErrorData(e.getMessage())).build();
        }
        try {
            log.info("Uploading function package to {}", (Object)path);
            Utils.uploadToBookeeper(this.worker().getDlogNamespace(), uploadedInputStream, Codec.encode((String)path));
        }
        catch (IOException e) {
            log.error("Error uploading file {}", (Object)path, (Object)e);
            return Response.serverError().type("application/json").entity((Object)new ErrorData(e.getMessage())).build();
        }
        return Response.status((Response.Status)Response.Status.OK).build();
    }

    @GET
    @Path(value="/download")
    public Response downloadFunction(final @QueryParam(value="path") String path) {
        return Response.status((Response.Status)Response.Status.OK).entity((Object)new StreamingOutput(){

            public void write(OutputStream output) throws IOException {
                Utils.downloadFromBookkeeper(FunctionsImpl.this.worker().getDlogNamespace(), output, Codec.encode((String)path));
            }
        }).build();
    }

    private void validateListFunctionRequestParams(String tenant, String namespace) throws IllegalArgumentException {
        if (tenant == null) {
            throw new IllegalArgumentException("Tenant is not provided");
        }
        if (namespace == null) {
            throw new IllegalArgumentException("Namespace is not provided");
        }
    }

    private void validateGetFunctionInstanceRequestParams(String tenant, String namespace, String functionName, String instanceId) throws IllegalArgumentException {
        this.validateGetFunctionRequestParams(tenant, namespace, functionName);
        if (instanceId == null) {
            throw new IllegalArgumentException("Function Instance Id is not provided");
        }
    }

    private void validateGetFunctionRequestParams(String tenant, String namespace, String functionName) throws IllegalArgumentException {
        if (tenant == null) {
            throw new IllegalArgumentException("Tenant is not provided");
        }
        if (namespace == null) {
            throw new IllegalArgumentException("Namespace is not provided");
        }
        if (functionName == null) {
            throw new IllegalArgumentException("Function Name is not provided");
        }
    }

    private void validateDeregisterRequestParams(String tenant, String namespace, String functionName) throws IllegalArgumentException {
        if (tenant == null) {
            throw new IllegalArgumentException("Tenant is not provided");
        }
        if (namespace == null) {
            throw new IllegalArgumentException("Namespace is not provided");
        }
        if (functionName == null) {
            throw new IllegalArgumentException("Function Name is not provided");
        }
    }

    private Function.FunctionDetails validateUpdateRequestParams(String tenant, String namespace, String functionName, InputStream uploadedInputStream, FormDataContentDisposition fileDetail, String functionDetailsJson) throws IllegalArgumentException {
        if (tenant == null) {
            throw new IllegalArgumentException("Tenant is not provided");
        }
        if (namespace == null) {
            throw new IllegalArgumentException("Namespace is not provided");
        }
        if (functionName == null) {
            throw new IllegalArgumentException("Function Name is not provided");
        }
        if (uploadedInputStream == null || fileDetail == null) {
            throw new IllegalArgumentException("Function Package is not provided");
        }
        if (functionDetailsJson == null) {
            throw new IllegalArgumentException("FunctionDetails is not provided");
        }
        try {
            Function.FunctionDetails.Builder functionDetailsBuilder = Function.FunctionDetails.newBuilder();
            org.apache.pulsar.functions.utils.Utils.mergeJson((String)functionDetailsJson, (AbstractMessage.Builder)functionDetailsBuilder);
            Function.FunctionDetails functionDetails = functionDetailsBuilder.build();
            LinkedList<String> missingFields = new LinkedList<String>();
            if (functionDetails.getTenant() == null || functionDetails.getTenant().isEmpty()) {
                missingFields.add("Tenant");
            }
            if (functionDetails.getNamespace() == null || functionDetails.getNamespace().isEmpty()) {
                missingFields.add("Namespace");
            }
            if (functionDetails.getName() == null || functionDetails.getName().isEmpty()) {
                missingFields.add("Name");
            }
            if (functionDetails.getClassName() == null || functionDetails.getClassName().isEmpty()) {
                missingFields.add("ClassName");
            }
            if (!functionDetails.getSource().isInitialized()) {
                missingFields.add("Source");
            } else if (functionDetails.getSource().getTopicsToSerDeClassNameMap().isEmpty()) {
                missingFields.add("Source Topics Serde Map");
            }
            if (!missingFields.isEmpty()) {
                String errorMessage = StringUtils.join(missingFields, (String)",");
                throw new IllegalArgumentException(errorMessage + " is not provided");
            }
            if (functionDetails.getParallelism() <= 0) {
                throw new IllegalArgumentException("Parallelism needs to be set to a positive number");
            }
            return functionDetails;
        }
        catch (IllegalArgumentException ex) {
            throw ex;
        }
        catch (Exception ex) {
            throw new IllegalArgumentException("Invalid FunctionDetails");
        }
    }

    private void validateTriggerRequestParams(String tenant, String namespace, String functionName, String topic, String input, InputStream uploadedInputStream) {
        if (tenant == null) {
            throw new IllegalArgumentException("Tenant is not provided");
        }
        if (namespace == null) {
            throw new IllegalArgumentException("Namespace is not provided");
        }
        if (functionName == null) {
            throw new IllegalArgumentException("Function Name is not provided");
        }
        if (uploadedInputStream == null && input == null) {
            throw new IllegalArgumentException("Trigger Data is not provided");
        }
    }

    private Response getUnavailableResponse() {
        return Response.status((Response.Status)Response.Status.SERVICE_UNAVAILABLE).type("application/json").entity((Object)new ErrorData("Function worker service is not done initializing. Please try again in a little while.")).build();
    }

    public static String createPackagePath(String tenant, String namespace, String functionName, String fileName) {
        return String.format("%s/%s/%s/%s", tenant, namespace, Codec.encode((String)functionName), Utils.getUniquePackageName(Codec.encode((String)fileName)));
    }
}

