/*
 * Decompiled with CFR 0.152.
 */
package org.openmetadata.service.resources.services.ingestionpipelines;

import io.swagger.annotations.Api;
import io.swagger.v3.oas.annotations.ExternalDocumentation;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
import io.swagger.v3.oas.annotations.media.Content;
import io.swagger.v3.oas.annotations.media.ExampleObject;
import io.swagger.v3.oas.annotations.media.Schema;
import io.swagger.v3.oas.annotations.parameters.RequestBody;
import io.swagger.v3.oas.annotations.responses.ApiResponse;
import java.io.IOException;
import java.net.http.HttpResponse;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import javax.json.JsonPatch;
import javax.validation.Valid;
import javax.validation.constraints.Max;
import javax.validation.constraints.Min;
import javax.ws.rs.Consumes;
import javax.ws.rs.DELETE;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
import javax.ws.rs.PATCH;
import javax.ws.rs.POST;
import javax.ws.rs.PUT;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.SecurityContext;
import javax.ws.rs.core.UriInfo;
import org.openmetadata.common.utils.CommonUtil;
import org.openmetadata.schema.CreateEntity;
import org.openmetadata.schema.api.services.ingestionPipelines.CreateIngestionPipeline;
import org.openmetadata.schema.api.services.ingestionPipelines.TestServiceConnection;
import org.openmetadata.schema.entity.services.ingestionPipelines.IngestionPipeline;
import org.openmetadata.schema.services.connections.metadata.OpenMetadataServerConnection;
import org.openmetadata.schema.type.EntityHistory;
import org.openmetadata.schema.type.Include;
import org.openmetadata.schema.type.MetadataOperation;
import org.openmetadata.service.Entity;
import org.openmetadata.service.OpenMetadataApplicationConfig;
import org.openmetadata.service.airflow.AirflowRESTClient;
import org.openmetadata.service.jdbi3.CollectionDAO;
import org.openmetadata.service.jdbi3.IngestionPipelineRepository;
import org.openmetadata.service.jdbi3.ListFilter;
import org.openmetadata.service.resources.Collection;
import org.openmetadata.service.resources.EntityResource;
import org.openmetadata.service.secrets.SecretsManager;
import org.openmetadata.service.secrets.SecretsManagerFactory;
import org.openmetadata.service.security.AuthorizationException;
import org.openmetadata.service.security.Authorizer;
import org.openmetadata.service.security.policyevaluator.OperationContext;
import org.openmetadata.service.util.EntityUtil;
import org.openmetadata.service.util.OpenMetadataServerConnectionBuilder;
import org.openmetadata.service.util.PipelineServiceClient;
import org.openmetadata.service.util.ResultList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Path(value="/v1/services/ingestionPipelines/")
@Api(value="Ingestion collection", tags={"Ingestion collection"})
@Produces(value={"application/json"})
@Consumes(value={"application/json"})
@Collection(name="IngestionPipelines")
public class IngestionPipelineResource
extends EntityResource<IngestionPipeline, IngestionPipelineRepository> {
    private static final Logger LOG = LoggerFactory.getLogger(IngestionPipelineResource.class);
    public static final String COLLECTION_PATH = "v1/services/ingestionPipelines/";
    private PipelineServiceClient pipelineServiceClient;
    private OpenMetadataApplicationConfig openMetadataApplicationConfig;
    static final String FIELDS = "owner";

    @Override
    public IngestionPipeline addHref(UriInfo uriInfo, IngestionPipeline ingestionPipeline) {
        Entity.withHref(uriInfo, ingestionPipeline.getOwner());
        Entity.withHref(uriInfo, ingestionPipeline.getService());
        return ingestionPipeline;
    }

    public IngestionPipelineResource(CollectionDAO dao, Authorizer authorizer) {
        super(IngestionPipeline.class, new IngestionPipelineRepository(dao), authorizer);
    }

    public void initialize(OpenMetadataApplicationConfig config) {
        this.openMetadataApplicationConfig = config;
        this.pipelineServiceClient = new AirflowRESTClient(this.openMetadataApplicationConfig.getAirflowConfiguration());
        ((IngestionPipelineRepository)this.dao).setPipelineServiceClient(this.pipelineServiceClient);
    }

    @GET
    @Valid
    @Operation(operationId="listIngestionPipelines", summary="List Ingestion Pipelines for Metadata Operations", tags={"IngestionPipelines"}, description="Get a list of Airflow Pipelines for Metadata Operations. Use `fields` parameter to get only necessary fields.  Use cursor-based pagination to limit the number entries in the list using `limit` and `before` or `after` query params.", responses={@ApiResponse(responseCode="200", description="List of ingestion workflows", content={@Content(mediaType="application/json", schema=@Schema(implementation=IngestionPipeline.class))})})
    public ResultList<IngestionPipeline> list(@Context UriInfo uriInfo, @Context SecurityContext securityContext, @Parameter(description="Fields requested in the returned resource", schema=@Schema(type="string", example="owner")) @QueryParam(value="fields") String fieldsParam, @Parameter(description="Filter airflow pipelines by service fully qualified name", schema=@Schema(type="string", example="snowflakeWestCoast")) @QueryParam(value="service") String serviceParam, @Parameter(description="Limit the number ingestion returned. (1 to 1000000, default = 10)") @DefaultValue(value="10") @Min(value=0L) @Max(value=1000000L) @QueryParam(value="limit") @Min(value=0L) @Max(value=1000000L) int limitParam, @Parameter(description="Returns list of ingestion before this cursor", schema=@Schema(type="string")) @QueryParam(value="before") String before, @Parameter(description="Returns list of ingestion after this cursor", schema=@Schema(type="string")) @QueryParam(value="after") String after, @Parameter(description="Include all, deleted, or non-deleted entities.", schema=@Schema(implementation=Include.class)) @QueryParam(value="include") @DefaultValue(value="non-deleted") Include include) throws IOException {
        ListFilter filter = new ListFilter(include).addQueryParam("service", serviceParam);
        ResultList<IngestionPipeline> ingestionPipelines = super.listInternal(uriInfo, securityContext, fieldsParam, filter, limitParam, before, after);
        if (fieldsParam != null && fieldsParam.contains("pipelineStatuses")) {
            this.addStatus(ingestionPipelines.getData());
        }
        CommonUtil.listOrEmpty(ingestionPipelines.getData()).forEach(ingestionPipeline -> this.decryptOrNullify(securityContext, (IngestionPipeline)ingestionPipeline));
        return ingestionPipelines;
    }

    @GET
    @Path(value="/{id}/versions")
    @Operation(operationId="listAllIngestionPipelineVersion", summary="List ingestion workflow versions", tags={"IngestionPipelines"}, description="Get a list of all the versions of a IngestionPipeline identified by `id`", responses={@ApiResponse(responseCode="200", description="List of IngestionPipeline versions", content={@Content(mediaType="application/json", schema=@Schema(implementation=EntityHistory.class))})})
    public EntityHistory listVersions(@Context UriInfo uriInfo, @Context SecurityContext securityContext, @Parameter(description="IngestionPipeline Id", schema=@Schema(type="string")) @PathParam(value="id") UUID id) throws IOException {
        return super.listVersionsInternal(securityContext, id);
    }

    @GET
    @Path(value="/{id}")
    @Operation(operationId="getIngestionPipelineByID", summary="Get a IngestionPipeline", tags={"IngestionPipelines"}, description="Get a IngestionPipeline by `id`.", responses={@ApiResponse(responseCode="200", description="The ingestion", content={@Content(mediaType="application/json", schema=@Schema(implementation=IngestionPipeline.class))}), @ApiResponse(responseCode="404", description="IngestionPipeline for instance {id} is not found")})
    public IngestionPipeline get(@Context UriInfo uriInfo, @Context SecurityContext securityContext, @PathParam(value="id") UUID id, @Parameter(description="Fields requested in the returned resource", schema=@Schema(type="string", example="owner")) @QueryParam(value="fields") String fieldsParam, @Parameter(description="Include all, deleted, or non-deleted entities.", schema=@Schema(implementation=Include.class)) @QueryParam(value="include") @DefaultValue(value="non-deleted") Include include) throws IOException {
        IngestionPipeline ingestionPipeline = (IngestionPipeline)this.getInternal(uriInfo, securityContext, id, fieldsParam, include);
        if (fieldsParam != null && fieldsParam.contains("pipelineStatuses")) {
            ingestionPipeline = this.addStatus(ingestionPipeline);
        }
        return this.decryptOrNullify(securityContext, ingestionPipeline);
    }

    @GET
    @Path(value="/{id}/versions/{version}")
    @Operation(operationId="getSpecificIngestionPipelineVersion", summary="Get a version of the IngestionPipeline", tags={"IngestionPipelines"}, description="Get a version of the IngestionPipeline by given `id`", responses={@ApiResponse(responseCode="200", description="IngestionPipelines", content={@Content(mediaType="application/json", schema=@Schema(implementation=IngestionPipeline.class))}), @ApiResponse(responseCode="404", description="IngestionPipeline for instance {id} and version  {version} is not found")})
    public IngestionPipeline getVersion(@Context UriInfo uriInfo, @Context SecurityContext securityContext, @Parameter(description="Ingestion Id", schema=@Schema(type="string")) @PathParam(value="id") UUID id, @Parameter(description="Ingestion version number in the form `major`.`minor`", schema=@Schema(type="string", example="0.1 or 1.1")) @PathParam(value="version") String version) throws IOException {
        return (IngestionPipeline)super.getVersionInternal(securityContext, id, version);
    }

    @GET
    @Path(value="/name/{fqn}")
    @Operation(operationId="getSpecificIngestionPipelineByFQN", summary="Get a IngestionPipeline by name", tags={"IngestionPipelines"}, description="Get a ingestion by fully qualified name.", responses={@ApiResponse(responseCode="200", description="IngestionPipeline", content={@Content(mediaType="application/json", schema=@Schema(implementation=IngestionPipeline.class))}), @ApiResponse(responseCode="404", description="Ingestion for instance {id} is not found")})
    public IngestionPipeline getByName(@Context UriInfo uriInfo, @PathParam(value="fqn") String fqn, @Context SecurityContext securityContext, @Parameter(description="Fields requested in the returned resource", schema=@Schema(type="string", example="owner")) @QueryParam(value="fields") String fieldsParam, @Parameter(description="Include all, deleted, or non-deleted entities.", schema=@Schema(implementation=Include.class)) @QueryParam(value="include") @DefaultValue(value="non-deleted") Include include) throws IOException {
        IngestionPipeline ingestionPipeline = (IngestionPipeline)this.getByNameInternal(uriInfo, securityContext, fqn, fieldsParam, include);
        if (fieldsParam != null && fieldsParam.contains("pipelineStatuses")) {
            ingestionPipeline = this.addStatus(ingestionPipeline);
        }
        return this.decryptOrNullify(securityContext, ingestionPipeline);
    }

    @POST
    @Operation(operationId="createIngestionPipeline", summary="Create a Ingestion Pipeline", tags={"IngestionPipelines"}, description="Create a new Ingestion Pipeline.", responses={@ApiResponse(responseCode="200", description="The Ingestion Pipeline", content={@Content(mediaType="application/json", schema=@Schema(implementation=IngestionPipeline.class))}), @ApiResponse(responseCode="400", description="Bad request")})
    public Response create(@Context UriInfo uriInfo, @Context SecurityContext securityContext, @Valid CreateIngestionPipeline create) throws IOException {
        IngestionPipeline ingestionPipeline = this.getIngestionPipeline(create, securityContext.getUserPrincipal().getName());
        Response response = this.create(uriInfo, securityContext, ingestionPipeline, true);
        this.decryptOrNullify(securityContext, (IngestionPipeline)response.getEntity());
        return response;
    }

    @PATCH
    @Path(value="/{id}")
    @Operation(operationId="patchIngestionPipeline", summary="Update a IngestionPipeline", tags={"IngestionPipelines"}, description="Update an existing IngestionPipeline using JsonPatch.", externalDocs=@ExternalDocumentation(description="JsonPatch RFC", url="https://tools.ietf.org/html/rfc6902"))
    @Consumes(value={"application/json-patch+json"})
    public Response updateDescription(@Context UriInfo uriInfo, @Context SecurityContext securityContext, @PathParam(value="id") UUID id, @RequestBody(description="JsonPatch with array of operations", content={@Content(mediaType="application/json-patch+json", examples={@ExampleObject(value="[{op:remove, path:/a},{op:add, path: /b, value: val}]")})}) JsonPatch patch) throws IOException {
        Response response = this.patchInternal(uriInfo, securityContext, id, patch);
        this.decryptOrNullify(securityContext, (IngestionPipeline)response.getEntity());
        return response;
    }

    @PUT
    @Operation(operationId="createOrUpdateIngestionPipeline", summary="Create or update a IngestionPipeline", tags={"IngestionPipelines"}, description="Create a new IngestionPipeline, if it does not exist or update an existing IngestionPipeline.", responses={@ApiResponse(responseCode="200", description="The IngestionPipeline", content={@Content(mediaType="application/json", schema=@Schema(implementation=IngestionPipeline.class))}), @ApiResponse(responseCode="400", description="Bad request")})
    public Response createOrUpdate(@Context UriInfo uriInfo, @Context SecurityContext securityContext, @Valid CreateIngestionPipeline update) throws IOException {
        IngestionPipeline ingestionPipeline = this.getIngestionPipeline(update, securityContext.getUserPrincipal().getName());
        Response response = this.createOrUpdate(uriInfo, securityContext, ingestionPipeline, true);
        this.decryptOrNullify(securityContext, (IngestionPipeline)response.getEntity());
        return response;
    }

    @POST
    @Path(value="/deploy/{id}")
    @Operation(summary="Deploy a ingestion pipeline run", tags={"IngestionPipelines"}, description="Trigger a ingestion pipeline run by id.", responses={@ApiResponse(responseCode="200", description="The ingestion", content={@Content(mediaType="application/json", schema=@Schema(implementation=IngestionPipeline.class))}), @ApiResponse(responseCode="404", description="Ingestion for instance {id} is not found")})
    public IngestionPipeline deployIngestion(@Context UriInfo uriInfo, @PathParam(value="id") UUID id, @Context SecurityContext securityContext) throws IOException {
        EntityUtil.Fields fields = this.getFields(FIELDS);
        IngestionPipeline ingestionPipeline = (IngestionPipeline)((IngestionPipelineRepository)this.dao).get(uriInfo, id, fields);
        ingestionPipeline.setOpenMetadataServerConnection(new OpenMetadataServerConnectionBuilder(this.openMetadataApplicationConfig).build());
        this.pipelineServiceClient.deployPipeline(ingestionPipeline);
        this.decryptOrNullify(securityContext, ingestionPipeline);
        return this.addHref(uriInfo, ingestionPipeline);
    }

    @POST
    @Path(value="/trigger/{id}")
    @Operation(operationId="triggerIngestionPipelineRun", summary="Trigger a ingestion pipeline run", tags={"IngestionPipelines"}, description="Trigger a ingestion pipeline run by id.", responses={@ApiResponse(responseCode="200", description="The ingestion", content={@Content(mediaType="application/json", schema=@Schema(implementation=IngestionPipeline.class))}), @ApiResponse(responseCode="404", description="Ingestion for instance {id} is not found")})
    public IngestionPipeline triggerIngestion(@Context UriInfo uriInfo, @PathParam(value="id") UUID id, @Context SecurityContext securityContext) throws IOException {
        EntityUtil.Fields fields = this.getFields(FIELDS);
        IngestionPipeline ingestionPipeline = (IngestionPipeline)((IngestionPipelineRepository)this.dao).get(uriInfo, id, fields);
        this.pipelineServiceClient.runPipeline(ingestionPipeline.getName());
        this.decryptOrNullify(securityContext, ingestionPipeline);
        return this.addHref(uriInfo, ingestionPipeline);
    }

    @POST
    @Path(value="/toggleIngestion/{id}")
    @Operation(operationId="toggleIngestionPipelineEnabled", summary="Set an Ingestion pipeline either as Enabled or Disabled", tags={"IngestionPipelines"}, description="Toggle an ingestion pipeline state by id.", responses={@ApiResponse(responseCode="200", description="The ingestion", content={@Content(mediaType="application/json", schema=@Schema(implementation=IngestionPipeline.class))}), @ApiResponse(responseCode="404", description="Ingestion for instance {id} is not found")})
    public Response toggleIngestion(@Context UriInfo uriInfo, @PathParam(value="id") UUID id, @Context SecurityContext securityContext) throws IOException {
        EntityUtil.Fields fields = this.getFields(FIELDS);
        IngestionPipeline pipeline = (IngestionPipeline)((IngestionPipelineRepository)this.dao).get(uriInfo, id, fields);
        this.pipelineServiceClient.toggleIngestion(pipeline);
        return this.createOrUpdate(uriInfo, securityContext, pipeline, true);
    }

    @POST
    @Path(value="/kill/{id}")
    @Operation(operationId="killIngestionPipelineRuns", summary="Mark as failed and kill any not-finished workflow or task for the IngestionPipeline", tags={"IngestionPipelines"}, description="Kill an ingestion pipeline by ID.", responses={@ApiResponse(responseCode="200", description="The ingestion", content={@Content(mediaType="application/json", schema=@Schema(implementation=IngestionPipeline.class))}), @ApiResponse(responseCode="404", description="Ingestion for instance {id} is not found")})
    public Response killIngestion(@Context UriInfo uriInfo, @PathParam(value="id") UUID id, @Context SecurityContext securityContext) throws IOException {
        IngestionPipeline ingestionPipeline = (IngestionPipeline)this.getInternal(uriInfo, securityContext, id, FIELDS, Include.NON_DELETED);
        this.decryptOrNullify(securityContext, ingestionPipeline);
        HttpResponse<String> response = this.pipelineServiceClient.killIngestion(ingestionPipeline);
        return Response.status((int)200, (String)response.body()).build();
    }

    @POST
    @Path(value="/testConnection")
    @Operation(operationId="testConnection", summary="Test Connection of a Service", tags={"IngestionPipelines"}, description="Test Connection of a Service.", responses={@ApiResponse(responseCode="200", description="The ingestion", content={@Content(mediaType="application/json")})})
    public Response testIngestion(@Context UriInfo uriInfo, @Context SecurityContext securityContext, @Valid TestServiceConnection testServiceConnection) {
        SecretsManager secretsManager = SecretsManagerFactory.getSecretsManager();
        testServiceConnection = testServiceConnection.withConnection(secretsManager.storeTestConnectionObject(testServiceConnection)).withSecretsManagerProvider(secretsManager.getSecretsManagerProvider()).withClusterName(this.openMetadataApplicationConfig.getClusterName());
        HttpResponse<String> response = this.pipelineServiceClient.testConnection(testServiceConnection);
        return Response.status((int)200, (String)response.body()).build();
    }

    @GET
    @Path(value="/status")
    @Operation(operationId="checkRestAirflowStatus", summary="Check the Airflow REST status", tags={"IngestionPipelines"}, description="Check that the Airflow REST endpoint is reachable and up and running", responses={@ApiResponse(responseCode="200", description="Status message", content={@Content(mediaType="application/json")})})
    public Response getRESTStatus(@Context UriInfo uriInfo, @Context SecurityContext securityContext) {
        return this.pipelineServiceClient.getServiceStatus();
    }

    @GET
    @Path(value="/ip")
    @Operation(operationId="checkAirflowHostIp", summary="Check the Airflow REST host IP", tags={"IngestionPipelines"}, description="Check the Airflow REST host IP", responses={@ApiResponse(responseCode="200", description="Pipeline Service host IP", content={@Content(mediaType="application/json")})})
    public Response getHostIp(@Context UriInfo uriInfo, @Context SecurityContext securityContext) {
        Map<String, String> hostIp = this.pipelineServiceClient.getHostIp();
        return Response.ok(hostIp, (MediaType)MediaType.APPLICATION_JSON_TYPE).build();
    }

    @DELETE
    @Path(value="/{id}")
    @Operation(operationId="deleteIngestionPipeline", summary="Delete a Ingestion", tags={"IngestionPipelines"}, description="Delete a ingestion by `id`.", responses={@ApiResponse(responseCode="200", description="OK"), @ApiResponse(responseCode="404", description="Ingestion for instance {id} is not found")})
    public Response delete(@Context UriInfo uriInfo, @Context SecurityContext securityContext, @Parameter(description="Hard delete the entity. (Default = `false`)") @QueryParam(value="hardDelete") @DefaultValue(value="false") boolean hardDelete, @Parameter(description="Pipeline Id", schema=@Schema(type="string")) @PathParam(value="id") UUID id) throws IOException {
        return this.delete(uriInfo, securityContext, id, false, hardDelete, true);
    }

    @GET
    @Path(value="/logs/{id}/last")
    @Operation(summary="Retrieve all logs from last ingestion pipeline run", tags={"IngestionPipelines"}, description="Get all logs from last ingestion pipeline run by `id`.", responses={@ApiResponse(responseCode="200", description="JSON object with the task instance name of the ingestion on each key and log in the value", content={@Content(mediaType="application/json")}), @ApiResponse(responseCode="404", description="Logs for instance {id} is not found")})
    public Response getLastIngestionLogs(@Context UriInfo uriInfo, @Context SecurityContext securityContext, @Parameter(description="Pipeline Id", schema=@Schema(type="string")) @PathParam(value="id") UUID id) throws IOException {
        IngestionPipeline ingestionPipeline = (IngestionPipeline)this.getInternal(uriInfo, securityContext, id, FIELDS, Include.NON_DELETED);
        Map<String, String> lastIngestionLogs = this.pipelineServiceClient.getLastIngestionLogs(ingestionPipeline);
        return Response.ok(lastIngestionLogs, (MediaType)MediaType.APPLICATION_JSON_TYPE).build();
    }

    private IngestionPipeline getIngestionPipeline(CreateIngestionPipeline create, String user) throws IOException {
        OpenMetadataServerConnection openMetadataServerConnection = new OpenMetadataServerConnectionBuilder(this.openMetadataApplicationConfig).build();
        return this.copy(new IngestionPipeline(), (CreateEntity)create, user).withPipelineType(create.getPipelineType()).withAirflowConfig(create.getAirflowConfig()).withOpenMetadataServerConnection(openMetadataServerConnection).withSourceConfig(create.getSourceConfig()).withLoggerLevel(create.getLoggerLevel()).withService(create.getService());
    }

    public void addStatus(List<IngestionPipeline> ingestionPipelines) {
        CommonUtil.listOrEmpty(ingestionPipelines).forEach(this::addStatus);
    }

    private IngestionPipeline addStatus(IngestionPipeline ingestionPipeline) {
        try {
            ingestionPipeline = this.pipelineServiceClient.getPipelineStatus(ingestionPipeline);
        }
        catch (Exception e) {
            LOG.error("Failed to fetch status for {} due to {}", (Object)ingestionPipeline.getName(), (Object)e);
        }
        return ingestionPipeline;
    }

    private IngestionPipeline decryptOrNullify(SecurityContext securityContext, IngestionPipeline ingestionPipeline) {
        SecretsManager secretsManager = SecretsManagerFactory.getSecretsManager();
        try {
            this.authorizer.authorize(securityContext, new OperationContext(this.entityType, MetadataOperation.VIEW_ALL), this.getResourceContextById(ingestionPipeline.getId()), secretsManager.isLocal());
        }
        catch (IOException | AuthorizationException e) {
            ingestionPipeline.getSourceConfig().setConfig(null);
            return ingestionPipeline;
        }
        secretsManager.encryptOrDecryptDbtConfigSource(ingestionPipeline, false);
        return ingestionPipeline;
    }

    public static class IngestionPipelineList
    extends ResultList<IngestionPipeline> {
        public IngestionPipelineList() {
        }

        public IngestionPipelineList(List<IngestionPipeline> data, String beforeCursor, String afterCursor, int total) {
            super(data, beforeCursor, afterCursor, total);
        }
    }
}

