package org.apache.pulsar.broker.admin.impl;

import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiParam;
import io.swagger.annotations.ApiResponse;
import io.swagger.annotations.ApiResponses;
import io.swagger.annotations.Example;
import io.swagger.annotations.ExampleProperty;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Supplier;
import javax.ws.rs.Consumes;
import javax.ws.rs.DELETE;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.PUT;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import org.apache.commons.lang.StringUtils;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.common.functions.UpdateOptions;
import org.apache.pulsar.common.io.ConnectorDefinition;
import org.apache.pulsar.common.io.SinkConfig;
import org.apache.pulsar.common.policies.data.SinkStatus;
import org.apache.pulsar.functions.worker.WorkerService;
import org.apache.pulsar.functions.worker.rest.api.SinksImpl;
import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
import org.glassfish.jersey.media.multipart.FormDataParam;

/* loaded from: input_file:org/apache/pulsar/broker/admin/impl/SinksBase.class */
public class SinksBase extends AdminResource implements Supplier<WorkerService> {
    private final SinksImpl sink = new SinksImpl(this);

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // java.util.function.Supplier
    public WorkerService get() {
        return pulsar().getWorkerService();
    }

    @ApiResponses({@ApiResponse(code = 400, message = "Invalid request (sink already exists, etc.)"), @ApiResponse(code = 200, message = "Pulsar Sink successfully created"), @ApiResponse(code = 500, message = "Internal server error (failed to authorize, failed to get tenant data, failed to process package, etc.)"), @ApiResponse(code = 401, message = "Client is not authorized to perform operation"), @ApiResponse(code = 503, message = "Function worker service is now initializing. Please try again later.")})
    @Path("/{tenant}/{namespace}/{sinkName}")
    @Consumes({"multipart/form-data"})
    @ApiOperation("Creates a new Pulsar Sink in cluster mode")
    @POST
    public void registerSink(@PathParam("tenant") @ApiParam("The sink's tenant") String str, @PathParam("namespace") @ApiParam("The sink's namespace") String str2, @PathParam("sinkName") @ApiParam("The sink's name") String str3, @FormDataParam("data") InputStream inputStream, @FormDataParam("data") FormDataContentDisposition formDataContentDisposition, @FormDataParam("url") String str4, @ApiParam(value = "A JSON value presenting a sink config playload. All available configuration options are:  \nclassname  \n   The sink's class name if archive is file-url-path (file://)  \nsourceSubscriptionName  \n   Pulsar source subscription name if user wants a specific  \n   subscription-name for input-topic consumer  \ninputs  \n   The sink's input topic or topics (specified as a JSON array)  \ntopicsPattern  \n   TopicsPattern to consume from list of topics under a namespace that    match the pattern. [input] and [topicsPattern] are mutually    exclusive. Add SerDe class name for a pattern in customSerdeInputs    (supported for java fun only)topicToSerdeClassName  \n   The map of input topics to SerDe class names (specified as a JSON object)  \ntopicToSchemaType  \n   The map of input topics to Schema types or class names (specified as a JSON object)  \ninputSpecs  \n   The map of input topics to its consumer configuration, each configuration has schema of    {\"schemaType\": \"type-x\", \"serdeClassName\": \"name-x\", \"isRegexPattern\": true, \"receiverQueueSize\": 5}  \nconfigs  \n   The map of configs (specified as a JSON object)  \nsecrets  \n   a map of secretName(aka how the secret is going to be \n   accessed in the function via context) to an object that \n   encapsulates how the secret is fetched by the underlying \n   secrets provider. The type of an value here can be found by the \n   SecretProviderConfigurator.getSecretObjectType() method. (specified as a JSON object)  \nparallelism  \n   The sink's parallelism factor (i.e. the number of sink instances to run \nprocessingGuarantees  \n   The processing guarantees (aka delivery semantics) applied to the sink. Possible Values: \"ATLEAST_ONCE\", \"ATMOST_ONCE\", \"EFFECTIVELY_ONCE\"  \nretainOrdering  \n   Boolean denotes whether sink consumes and sinks messages in order  \nresources  \n   {\"cpu\": 1, \"ram\": 2, \"disk\": 3} The CPU (in cores), RAM (in bytes) and disk (in bytes) that needs to be allocated per sink instance (applicable only to Docker runtime)  \nautoAck  \n   Boolean denotes whether or not the framework will automatically acknowledge messages  \ntimeoutMs  \n   Long denotes the message timeout in milliseconds  \ncleanupSubscription  \n   Boolean denotes whether the subscriptions the functions created/used should be deleted when the functions is deleted  \nruntimeFlags  \n   Any flags that you want to pass to the runtime as a single string  \n", examples = @Example({@ExampleProperty(mediaType = "application/json", value = "{  \n\t\"classname\": \"org.example.MySinkTest\",\n\t\"inputs\": [\"persistent://public/default/sink-input\"],\n\t\"processingGuarantees\": \"EFFECTIVELY_ONCE\",\n\t\"parallelism\": 10\n}")})) @FormDataParam("sinkConfig") String str5) {
        this.sink.registerFunction(str, str2, str3, inputStream, formDataContentDisposition, str4, str5, clientAppId(), clientAuthData());
    }

    @ApiResponses({@ApiResponse(code = 400, message = "Invalid request (sink doesn't exist, update contains no change, etc.)"), @ApiResponse(code = 200, message = "Pulsar Sink successfully updated"), @ApiResponse(code = 401, message = "Client is not authorized to perform operation"), @ApiResponse(code = 404, message = "The sink does not exist"), @ApiResponse(code = 500, message = "Internal server error (failed to authorize, failed to process package, etc.)"), @ApiResponse(code = 503, message = "Function worker service is now initializing. Please try again later.")})
    @Path("/{tenant}/{namespace}/{sinkName}")
    @Consumes({"multipart/form-data"})
    @ApiOperation("Updates a Pulsar Sink currently running in cluster mode")
    @PUT
    public void updateSink(@PathParam("tenant") @ApiParam("The sink's tenant") String str, @PathParam("namespace") @ApiParam("The sink's namespace") String str2, @PathParam("sinkName") @ApiParam("The sink's name") String str3, @FormDataParam("data") InputStream inputStream, @FormDataParam("data") FormDataContentDisposition formDataContentDisposition, @FormDataParam("url") String str4, @ApiParam(value = "A JSON value presenting a sink config playload. All available configuration options are:  \nclassname  \n   The sink's class name if archive is file-url-path (file://)  \nsourceSubscriptionName  \n   Pulsar source subscription name if user wants a specific  \n   subscription-name for input-topic consumer  \ninputs  \n   The sink's input topic or topics (specified as a JSON array)  \ntopicsPattern  \n   TopicsPattern to consume from list of topics under a namespace that    match the pattern. [input] and [topicsPattern] are mutually    exclusive. Add SerDe class name for a pattern in customSerdeInputs    (supported for java fun only)topicToSerdeClassName  \n   The map of input topics to SerDe class names (specified as a JSON object)  \ntopicToSchemaType  \n   The map of input topics to Schema types or class names (specified as a JSON object)  \ninputSpecs  \n   The map of input topics to its consumer configuration, each configuration has schema of    {\"schemaType\": \"type-x\", \"serdeClassName\": \"name-x\", \"isRegexPattern\": true, \"receiverQueueSize\": 5}  \nconfigs  \n   The map of configs (specified as a JSON object)  \nsecrets  \n   a map of secretName(aka how the secret is going to be \n   accessed in the function via context) to an object that \n   encapsulates how the secret is fetched by the underlying \n   secrets provider. The type of an value here can be found by the \n   SecretProviderConfigurator.getSecretObjectType() method. (specified as a JSON object)  \nparallelism  \n   The sink's parallelism factor (i.e. the number of sink instances to run \nprocessingGuarantees  \n   The processing guarantees (aka delivery semantics) applied to the sink. Possible Values: \"ATLEAST_ONCE\", \"ATMOST_ONCE\", \"EFFECTIVELY_ONCE\"  \nretainOrdering  \n   Boolean denotes whether sink consumes and sinks messages in order  \nresources  \n   {\"cpu\": 1, \"ram\": 2, \"disk\": 3} The CPU (in cores), RAM (in bytes) and disk (in bytes) that needs to be allocated per sink instance (applicable only to Docker runtime)  \nautoAck  \n   Boolean denotes whether or not the framework will automatically acknowledge messages  \ntimeoutMs  \n   Long denotes the message timeout in milliseconds  \ncleanupSubscription  \n   Boolean denotes whether the subscriptions the functions created/used should be deleted when the functions is deleted  \nruntimeFlags  \n   Any flags that you want to pass to the runtime as a single string  \n", examples = @Example({@ExampleProperty(mediaType = "application/json", value = "{  \n\t\"classname\": \"org.example.SinkStressTest\",  \n\t\"inputs\": [\"persistent://public/default/sink-input\"],\n\t\"processingGuarantees\": \"EFFECTIVELY_ONCE\",\n\t\"parallelism\": 5\n}")})) @FormDataParam("sinkConfig") String str5, @ApiParam @FormDataParam("updateOptions") UpdateOptions updateOptions) {
        this.sink.updateFunction(str, str2, str3, inputStream, formDataContentDisposition, str4, str5, clientAppId(), clientAuthData(), updateOptions);
    }

    @ApiResponses({@ApiResponse(code = 400, message = "Invalid deregister request"), @ApiResponse(code = 404, message = "The sink does not exist"), @ApiResponse(code = 200, message = "The sink was successfully deleted"), @ApiResponse(code = 401, message = "Client is not authorized to perform operation"), @ApiResponse(code = 500, message = "Internal server error (failed to authorize, failed to deregister, etc.)"), @ApiResponse(code = 408, message = "Got InterruptedException while deregistering the sink"), @ApiResponse(code = 503, message = "Function worker service is now initializing. Please try again later.")})
    @Path("/{tenant}/{namespace}/{sinkName}")
    @DELETE
    @ApiOperation("Deletes a Pulsar Sink currently running in cluster mode")
    public void deregisterSink(@PathParam("tenant") @ApiParam("The sink's tenant") String str, @PathParam("namespace") @ApiParam("The sink's namespace") String str2, @PathParam("sinkName") @ApiParam("The sink's name") String str3) {
        this.sink.deregisterFunction(str, str2, str3, clientAppId(), clientAuthData());
    }

    @GET
    @ApiResponses({@ApiResponse(code = 400, message = "Invalid request"), @ApiResponse(code = 404, message = "The sink does not exist"), @ApiResponse(code = 503, message = "Function worker service is now initializing. Please try again later.")})
    @Path("/{tenant}/{namespace}/{sinkName}")
    @ApiOperation(value = "Fetches information about a Pulsar Sink currently running in cluster mode", response = SinkConfig.class)
    public SinkConfig getSinkInfo(@PathParam("tenant") @ApiParam("The sink's tenant") String str, @PathParam("namespace") @ApiParam("The sink's namespace") String str2, @PathParam("sinkName") @ApiParam("The sink's name") String str3) throws IOException {
        return this.sink.getSinkInfo(str, str2, str3);
    }

    @GET
    @ApiResponses({@ApiResponse(code = 400, message = "The sink instance does not exist"), @ApiResponse(code = 404, message = "The sink does not exist"), @ApiResponse(code = 500, message = "Internal Server Error (got exception while getting status, etc.)"), @ApiResponse(code = 503, message = "Function worker service is now initializing. Please try again later.")})
    @Path("/{tenant}/{namespace}/{sinkName}/{instanceId}/status")
    @ApiOperation(value = "Displays the status of a Pulsar Sink instance", response = SinkStatus.SinkInstanceStatus.SinkInstanceStatusData.class)
    @Produces({"application/json"})
    public SinkStatus.SinkInstanceStatus.SinkInstanceStatusData getSinkInstanceStatus(@PathParam("tenant") @ApiParam("The sink's tenant") String str, @PathParam("namespace") @ApiParam("The sink's namespace") String str2, @PathParam("sinkName") @ApiParam("The sink's name") String str3, @PathParam("instanceId") @ApiParam("The sink instanceId") String str4) throws IOException {
        return this.sink.getSinkInstanceStatus(str, str2, str3, str4, this.uri.getRequestUri(), clientAppId(), clientAuthData());
    }

    @GET
    @ApiResponses({@ApiResponse(code = 400, message = "Invalid get status request"), @ApiResponse(code = 401, message = "The client is not authorized to perform this operation"), @ApiResponse(code = 404, message = "The sink does not exist"), @ApiResponse(code = 503, message = "Function worker service is now initializing. Please try again later.")})
    @Path("/{tenant}/{namespace}/{sinkName}/status")
    @ApiOperation(value = "Displays the status of a Pulsar Sink running in cluster mode", response = SinkStatus.class)
    @Produces({"application/json"})
    public SinkStatus getSinkStatus(@PathParam("tenant") @ApiParam("The sink's tenant") String str, @PathParam("namespace") @ApiParam("The sink's namespace") String str2, @PathParam("sinkName") @ApiParam("The sink's name") String str3) throws IOException {
        return this.sink.getSinkStatus(str, str2, str3, this.uri.getRequestUri(), clientAppId(), clientAuthData());
    }

    @GET
    @ApiResponses({@ApiResponse(code = 400, message = "Invalid list request"), @ApiResponse(code = 401, message = "The client is not authorized to perform this operation"), @ApiResponse(code = 500, message = "Internal server error (failed to authorize, etc.)"), @ApiResponse(code = 503, message = "Function worker service is now initializing. Please try again later.")})
    @Path("/{tenant}/{namespace}")
    @ApiOperation(value = "Lists all Pulsar Sinks currently deployed in a given namespace", response = String.class, responseContainer = "Collection")
    public List<String> listSinks(@PathParam("tenant") @ApiParam("The sink's tenant") String str, @PathParam("namespace") @ApiParam("The sink's namespace") String str2) {
        return this.sink.listFunctions(str, str2, clientAppId(), clientAuthData());
    }

    @ApiResponses({@ApiResponse(code = 400, message = "Invalid restart request"), @ApiResponse(code = 401, message = "The client is not authorized to perform this operation"), @ApiResponse(code = 404, message = "The sink does not exist"), @ApiResponse(code = 500, message = "Internal server error (failed to restart the sink instance, failed to authorize, etc.)"), @ApiResponse(code = 503, message = "Function worker service is now initializing. Please try again later.")})
    @Path("/{tenant}/{namespace}/{sinkName}/{instanceId}/restart")
    @Consumes({"application/json"})
    @ApiOperation(value = "Restart sink instance", response = Void.class)
    @POST
    public void restartSink(@PathParam("tenant") @ApiParam("The sink's tenant") String str, @PathParam("namespace") @ApiParam("The sink's namespace") String str2, @PathParam("sinkName") @ApiParam("The sink's name") String str3, @PathParam("instanceId") @ApiParam("The sink instanceId") String str4) {
        this.sink.restartFunctionInstance(str, str2, str3, str4, this.uri.getRequestUri(), clientAppId(), clientAuthData());
    }

    @ApiResponses({@ApiResponse(code = 400, message = "Invalid restart request"), @ApiResponse(code = 401, message = "The client is not authorized to perform this operation"), @ApiResponse(code = 404, message = "The sink does not exist"), @ApiResponse(code = 500, message = "Internal server error (failed to restart the sink, failed to authorize, etc.)"), @ApiResponse(code = 503, message = "Function worker service is now initializing. Please try again later.")})
    @Path("/{tenant}/{namespace}/{sinkName}/restart")
    @Consumes({"application/json"})
    @ApiOperation(value = "Restart all sink instances", response = Void.class)
    @POST
    public void restartSink(@PathParam("tenant") @ApiParam("The sink's tenant") String str, @PathParam("namespace") @ApiParam("The sink's namespace") String str2, @PathParam("sinkName") @ApiParam("The sink's name") String str3) {
        this.sink.restartFunctionInstances(str, str2, str3, clientAppId(), clientAuthData());
    }

    @ApiResponses({@ApiResponse(code = 400, message = "Invalid stop request"), @ApiResponse(code = 404, message = "The sink instance does not exist"), @ApiResponse(code = 500, message = "Internal server error (failed to stop the sink, failed to authorize, etc.)"), @ApiResponse(code = 401, message = "The client is not authorized to perform this operation"), @ApiResponse(code = 503, message = "Function worker service is now initializing. Please try again later.")})
    @Path("/{tenant}/{namespace}/{sinkName}/{instanceId}/stop")
    @Consumes({"application/json"})
    @ApiOperation(value = "Stop sink instance", response = Void.class)
    @POST
    public void stopSink(@PathParam("tenant") @ApiParam("The sink's tenant") String str, @PathParam("namespace") @ApiParam("The sink's namespace") String str2, @PathParam("sinkName") @ApiParam("The sink's name") String str3, @PathParam("instanceId") @ApiParam("The sink instanceId") String str4) {
        this.sink.stopFunctionInstance(str, str2, str3, str4, this.uri.getRequestUri(), clientAppId(), clientAuthData());
    }

    @ApiResponses({@ApiResponse(code = 400, message = "Invalid stop request"), @ApiResponse(code = 404, message = "The sink does not exist"), @ApiResponse(code = 500, message = "Internal server error (failed to stop the sink, failed to authorize, etc.)"), @ApiResponse(code = 401, message = "The client is not authorized to perform this operation"), @ApiResponse(code = 503, message = "Function worker service is now initializing. Please try again later.")})
    @Path("/{tenant}/{namespace}/{sinkName}/stop")
    @Consumes({"application/json"})
    @ApiOperation(value = "Stop all sink instances", response = Void.class)
    @POST
    public void stopSink(@PathParam("tenant") @ApiParam("The sink's tenant") String str, @PathParam("namespace") @ApiParam("The sink's namespace") String str2, @PathParam("sinkName") @ApiParam("The sink's name") String str3) {
        this.sink.stopFunctionInstances(str, str2, str3, clientAppId(), clientAuthData());
    }

    @ApiResponses({@ApiResponse(code = 400, message = "Invalid start request"), @ApiResponse(code = 404, message = "The sink does not exist"), @ApiResponse(code = 500, message = "Internal server error (failed to start the sink, failed to authorize, etc.)"), @ApiResponse(code = 401, message = "The client is not authorized to perform this operation"), @ApiResponse(code = 503, message = "Function worker service is now initializing. Please try again later.")})
    @Path("/{tenant}/{namespace}/{sinkName}/{instanceId}/start")
    @Consumes({"application/json"})
    @ApiOperation(value = "Start sink instance", response = Void.class)
    @POST
    public void startSink(@PathParam("tenant") @ApiParam("The sink's tenant") String str, @PathParam("namespace") @ApiParam("The sink's namespace") String str2, @PathParam("sinkName") @ApiParam("The sink's name") String str3, @PathParam("instanceId") @ApiParam("The sink instanceId") String str4) {
        this.sink.startFunctionInstance(str, str2, str3, str4, this.uri.getRequestUri(), clientAppId(), clientAuthData());
    }

    @ApiResponses({@ApiResponse(code = 400, message = "Invalid start request"), @ApiResponse(code = 404, message = "The sink does not exist"), @ApiResponse(code = 500, message = "Internal server error (failed to start the sink, failed to authorize, etc.)"), @ApiResponse(code = 401, message = "The client is not authorized to perform this operation"), @ApiResponse(code = 503, message = "Function worker service is now initializing. Please try again later.")})
    @Path("/{tenant}/{namespace}/{sinkName}/start")
    @Consumes({"application/json"})
    @ApiOperation(value = "Start all sink instances", response = Void.class)
    @POST
    public void startSink(@PathParam("tenant") @ApiParam("The sink's tenant") String str, @PathParam("namespace") @ApiParam("The sink's namespace") String str2, @PathParam("sinkName") @ApiParam("The sink's name") String str3) {
        this.sink.startFunctionInstances(str, str2, str3, clientAppId(), clientAuthData());
    }

    @GET
    @ApiResponses({@ApiResponse(code = 200, message = "Get builtin sinks successfully.")})
    @Path("/builtinsinks")
    @ApiOperation(value = "Fetches a list of supported Pulsar IO sink connectors currently running in cluster mode", response = List.class)
    public List<ConnectorDefinition> getSinkList() {
        List<ConnectorDefinition> listOfConnectors = this.sink.getListOfConnectors();
        ArrayList arrayList = new ArrayList();
        for (ConnectorDefinition connectorDefinition : listOfConnectors) {
            if (!StringUtils.isEmpty(connectorDefinition.getSinkClass())) {
                arrayList.add(connectorDefinition);
            }
        }
        return arrayList;
    }
}
