package org.openmetadata.service.airflow;

import com.fasterxml.jackson.core.type.TypeReference;
import java.io.IOException;
import java.net.URI;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.util.List;
import java.util.Map;
import javax.ws.rs.core.Response;
import org.json.JSONObject;
import org.openmetadata.schema.api.configuration.airflow.AirflowConfiguration;
import org.openmetadata.schema.api.services.ingestionPipelines.TestServiceConnection;
import org.openmetadata.schema.entity.services.ingestionPipelines.IngestionPipeline;
import org.openmetadata.schema.entity.services.ingestionPipelines.PipelineStatus;
import org.openmetadata.schema.entity.services.ingestionPipelines.PipelineType;
import org.openmetadata.service.exception.IngestionPipelineDeploymentException;
import org.openmetadata.service.exception.PipelineServiceClientException;
import org.openmetadata.service.security.JwtFilter;
import org.openmetadata.service.security.auth.BotTokenCache;
import org.openmetadata.service.util.JsonUtils;
import org.openmetadata.service.util.PipelineServiceClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/openmetadata/service/airflow/AirflowRESTClient.class */
public class AirflowRESTClient extends PipelineServiceClient {
    private static final String API_ENDPOINT = "api/v1/openmetadata";
    private static final String DAG_ID = "dag_id";
    private static final Logger LOG = LoggerFactory.getLogger(AirflowRESTClient.class);
    private static final Map<String, String> TYPE_TO_TASK = Map.of(PipelineType.METADATA.toString(), "ingestion_task", PipelineType.PROFILER.toString(), "profiler_task", PipelineType.LINEAGE.toString(), "lineage_task", PipelineType.DBT.toString(), "dbt_task", PipelineType.USAGE.toString(), "usage_task", PipelineType.TEST_SUITE.toString(), "test_suite_task", PipelineType.DATA_INSIGHT.toString(), "data_insight_task", PipelineType.ELASTIC_SEARCH_REINDEX.toString(), "elasticsearch_reindex_task");

    public AirflowRESTClient(AirflowConfiguration airflowConfiguration) {
        super(airflowConfiguration.getUsername(), airflowConfiguration.getPassword(), airflowConfiguration.getApiEndpoint(), airflowConfiguration.getHostIp(), airflowConfiguration.getTimeout().intValue());
    }

    @Override // org.openmetadata.service.util.PipelineServiceClient
    public String deployPipeline(IngestionPipeline ingestionPipeline) {
        try {
            HttpResponse<String> post = post(String.format("%s/%s/deploy", this.serviceURL, API_ENDPOINT), JsonUtils.pojoToJson(ingestionPipeline));
            if (post.statusCode() != 200) {
                throw new PipelineServiceClientException(String.format("%s Failed to deploy Ingestion Pipeline due to airflow API returned %s and response %s", ingestionPipeline.getName(), Response.Status.fromStatusCode(post.statusCode()), post.body()));
            }
            ingestionPipeline.setDeployed(true);
            return (String) post.body();
        } catch (Exception e) {
            throw IngestionPipelineDeploymentException.byMessage(ingestionPipeline.getName(), e.getMessage());
        }
    }

    @Override // org.openmetadata.service.util.PipelineServiceClient
    public String deletePipeline(String str) {
        try {
            return (String) deleteRequestAuthenticatedForJsonContent("%s/%s/delete?dag_id=%s", this.serviceURL, API_ENDPOINT, str).body();
        } catch (Exception e) {
            LOG.error(String.format("Failed to delete Airflow Pipeline %s from Airflow DAGS", str));
            return null;
        }
    }

    @Override // org.openmetadata.service.util.PipelineServiceClient
    public String runPipeline(String str) {
        try {
            String format = String.format("%s/%s/trigger", this.serviceURL, API_ENDPOINT);
            JSONObject jSONObject = new JSONObject();
            jSONObject.put(DAG_ID, str);
            HttpResponse<String> post = post(format, jSONObject.toString());
            if (post.statusCode() == 200) {
                return (String) post.body();
            }
            throw IngestionPipelineDeploymentException.byMessage(str, "Failed to trigger IngestionPipeline", Response.Status.fromStatusCode(post.statusCode()));
        } catch (Exception e) {
            throw IngestionPipelineDeploymentException.byMessage(str, e.getMessage());
        }
    }

    @Override // org.openmetadata.service.util.PipelineServiceClient
    public IngestionPipeline toggleIngestion(IngestionPipeline ingestionPipeline) {
        HttpResponse<String> post;
        try {
            JSONObject jSONObject = new JSONObject();
            jSONObject.put(DAG_ID, ingestionPipeline.getName());
            if (ingestionPipeline.getEnabled().equals(Boolean.TRUE)) {
                post = post(String.format("%s/%s/disable", this.serviceURL, API_ENDPOINT), jSONObject.toString());
                if (post.statusCode() == 200) {
                    ingestionPipeline.setEnabled(false);
                    return ingestionPipeline;
                }
                if (post.statusCode() == 404) {
                    ingestionPipeline.setDeployed(false);
                    return ingestionPipeline;
                }
            } else {
                post = post(String.format("%s/%s/enable", this.serviceURL, API_ENDPOINT), jSONObject.toString());
                if (post.statusCode() == 200) {
                    ingestionPipeline.setEnabled(true);
                    return ingestionPipeline;
                }
                if (post.statusCode() == 404) {
                    ingestionPipeline.setDeployed(false);
                    return ingestionPipeline;
                }
            }
            throw PipelineServiceClientException.byMessage(ingestionPipeline.getName(), "Failed to toggle ingestion pipeline state", Response.Status.fromStatusCode(post.statusCode()));
        } catch (Exception e) {
            throw PipelineServiceClientException.byMessage(ingestionPipeline.getName(), e.getMessage());
        }
    }

    @Override // org.openmetadata.service.util.PipelineServiceClient
    public List<PipelineStatus> getQueuedPipelineStatus(IngestionPipeline ingestionPipeline) {
        try {
            HttpResponse<String> requestAuthenticatedForJsonContent = getRequestAuthenticatedForJsonContent("%s/%s/status?dag_id=%s&only_queued=true", this.serviceURL, API_ENDPOINT, ingestionPipeline.getName());
            if (requestAuthenticatedForJsonContent.statusCode() == 200) {
                return JsonUtils.readObjects((String) requestAuthenticatedForJsonContent.body(), PipelineStatus.class);
            }
            throw PipelineServiceClientException.byMessage(ingestionPipeline.getName(), "Failed to fetch ingestion pipeline runs", Response.Status.fromStatusCode(requestAuthenticatedForJsonContent.statusCode()));
        } catch (Exception e) {
            throw PipelineServiceClientException.byMessage(ingestionPipeline.getName(), e.getMessage());
        }
    }

    @Override // org.openmetadata.service.util.PipelineServiceClient
    public Response getServiceStatus() {
        try {
            HttpResponse<String> requestNoAuthForJsonContent = getRequestNoAuthForJsonContent(this.serviceURL, API_ENDPOINT);
            if (requestNoAuthForJsonContent.statusCode() != 200) {
                throw new PipelineServiceClientException(String.format("Failed to get REST status due to %s.", requestNoAuthForJsonContent.body()));
            }
            String string = new JSONObject((String) requestNoAuthForJsonContent.body()).getString("version");
            return Boolean.TRUE.equals(validServerClientVersions(string)) ? Response.status(200, Map.of("status", "healthy").toString()).build() : Response.status(500, Map.of("status", "unhealthy", "reason", String.format("Got Ingestion Version %s and Server Version %s. They should match.", string, SERVER_VERSION)).toString()).build();
        } catch (Exception e) {
            throw PipelineServiceClientException.byMessage("Failed to get REST status.", e.getMessage());
        }
    }

    @Override // org.openmetadata.service.util.PipelineServiceClient
    public HttpResponse<String> testConnection(TestServiceConnection testServiceConnection) {
        try {
            HttpResponse<String> post = post(String.format("%s/%s/test_connection", this.serviceURL, API_ENDPOINT), JsonUtils.pojoToJson(testServiceConnection));
            if (post.statusCode() == 200) {
                return post;
            }
            throw new PipelineServiceClientException(String.format("Failed to test connection due to %s", post.body()));
        } catch (Exception e) {
            throw PipelineServiceClientException.byMessage("Failed to test connection.", e.getMessage());
        }
    }

    @Override // org.openmetadata.service.util.PipelineServiceClient
    public HttpResponse<String> killIngestion(IngestionPipeline ingestionPipeline) {
        try {
            String format = String.format("%s/%s/kill", this.serviceURL, API_ENDPOINT);
            JSONObject jSONObject = new JSONObject();
            jSONObject.put(DAG_ID, ingestionPipeline.getName());
            HttpResponse<String> post = post(format, jSONObject.toString());
            if (post.statusCode() == 200) {
                return post;
            }
            throw new PipelineServiceClientException(String.format("Failed to kill running workflows due to %s", post.body()));
        } catch (Exception e) {
            throw PipelineServiceClientException.byMessage("Failed to kill running workflows", e.getMessage());
        }
    }

    @Override // org.openmetadata.service.util.PipelineServiceClient
    public Map<String, String> requestGetHostIp() {
        try {
            HttpResponse<String> requestAuthenticatedForJsonContent = getRequestAuthenticatedForJsonContent("%s/%s/ip", this.serviceURL, API_ENDPOINT);
            if (requestAuthenticatedForJsonContent.statusCode() == 200) {
                return (Map) JsonUtils.readValue((String) requestAuthenticatedForJsonContent.body(), new TypeReference<Map<String, String>>() { // from class: org.openmetadata.service.airflow.AirflowRESTClient.1
                });
            }
            throw new PipelineServiceClientException(String.format("Failed to get Pipeline Service host IP due to %s", requestAuthenticatedForJsonContent.body()));
        } catch (Exception e) {
            throw PipelineServiceClientException.byMessage("Failed to get Pipeline Service host IP.", e.getMessage());
        }
    }

    @Override // org.openmetadata.service.util.PipelineServiceClient
    public Map<String, String> getLastIngestionLogs(IngestionPipeline ingestionPipeline, String str) {
        String str2 = TYPE_TO_TASK.get(ingestionPipeline.getPipelineType().toString());
        String str3 = BotTokenCache.EMPTY_STRING;
        if (str != null) {
            str3 = String.format("&after=%s", str);
        }
        try {
            HttpResponse<String> requestAuthenticatedForJsonContent = getRequestAuthenticatedForJsonContent("%s/%s/last_dag_logs?dag_id=%s&task_id=%s%s", this.serviceURL, API_ENDPOINT, ingestionPipeline.getName(), str2, str3);
            if (requestAuthenticatedForJsonContent.statusCode() == 200) {
                return (Map) JsonUtils.readValue((String) requestAuthenticatedForJsonContent.body(), new TypeReference<Map<String, String>>() { // from class: org.openmetadata.service.airflow.AirflowRESTClient.2
                });
            }
            throw new PipelineServiceClientException(String.format("Failed to get last ingestion logs due to %s", requestAuthenticatedForJsonContent.body()));
        } catch (Exception e) {
            throw PipelineServiceClientException.byMessage("Failed to get last ingestion logs.", e.getMessage());
        }
    }

    private HttpResponse<String> getRequestAuthenticatedForJsonContent(String str, Object... objArr) throws IOException, InterruptedException {
        return this.client.send(authenticatedRequestBuilder(str, objArr).GET().build(), HttpResponse.BodyHandlers.ofString());
    }

    private HttpResponse<String> deleteRequestAuthenticatedForJsonContent(String str, Object... objArr) throws IOException, InterruptedException {
        return this.client.send(authenticatedRequestBuilder(str, objArr).DELETE().build(), HttpResponse.BodyHandlers.ofString());
    }

    private HttpRequest.Builder authenticatedRequestBuilder(String str, Object... objArr) {
        return HttpRequest.newBuilder(URI.create(String.format(str, objArr))).header("Content-Type", "application/json").header(JwtFilter.AUTHORIZATION_HEADER, getBasicAuthenticationHeader(this.username, this.password));
    }

    private HttpResponse<String> getRequestNoAuthForJsonContent(Object... objArr) throws IOException, InterruptedException {
        return this.client.send(HttpRequest.newBuilder(URI.create(String.format("%s/%s/health", objArr))).header("Content-Type", "application/json").GET().build(), HttpResponse.BodyHandlers.ofString());
    }
}
