package org.openmetadata.service.clients.pipeline.airflow;

import com.fasterxml.jackson.core.type.TypeReference;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.security.KeyStoreException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import javax.net.ssl.SSLContext;
import javax.ws.rs.core.Response;
import org.apache.http.client.utils.URIBuilder;
import org.json.JSONObject;
import org.openmetadata.schema.ServiceEntityInterface;
import org.openmetadata.schema.api.configuration.pipelineServiceClient.PipelineServiceClientConfiguration;
import org.openmetadata.schema.entity.app.App;
import org.openmetadata.schema.entity.app.AppMarketPlaceDefinition;
import org.openmetadata.schema.entity.automations.Workflow;
import org.openmetadata.schema.entity.services.ingestionPipelines.IngestionPipeline;
import org.openmetadata.schema.entity.services.ingestionPipelines.PipelineServiceClientResponse;
import org.openmetadata.schema.entity.services.ingestionPipelines.PipelineStatus;
import org.openmetadata.sdk.exception.PipelineServiceClientException;
import org.openmetadata.service.Entity;
import org.openmetadata.service.clients.pipeline.PipelineServiceClient;
import org.openmetadata.service.exception.IngestionPipelineDeploymentException;
import org.openmetadata.service.migration.api.MigrationWorkflow;
import org.openmetadata.service.search.SearchClient;
import org.openmetadata.service.security.JwtFilter;
import org.openmetadata.service.util.JsonUtils;
import org.openmetadata.service.util.SSLUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/openmetadata/service/clients/pipeline/airflow/AirflowRESTClient.class */
public class AirflowRESTClient extends PipelineServiceClient {
    private static final String PLATFORM = "Airflow";
    private static final String USERNAME_KEY = "username";
    private static final String PASSWORD_KEY = "password";
    private static final String TIMEOUT_KEY = "timeout";
    private static final String TRUSTSTORE_PATH_KEY = "truststorePath";
    private static final String TRUSTSTORE_PASSWORD_KEY = "truststorePassword";
    private static final String DOCS_LINK = "Follow [this guide](https://docs.open-metadata.org/deployment/ingestion/openmetadata) for further details.";
    protected final String username;
    protected final String password;
    protected final HttpClient client;
    protected final URL serviceURL;
    private static final String DAG_ID = "dag_id";
    private static final Logger LOG = LoggerFactory.getLogger(AirflowRESTClient.class);
    private static final List<String> API_ENDPOINT_SEGMENTS = List.of(Entity.API, "v1", "openmetadata");

    public AirflowRESTClient(PipelineServiceClientConfiguration pipelineServiceClientConfiguration) throws KeyStoreException {
        super(pipelineServiceClientConfiguration);
        setPlatform(PLATFORM);
        this.username = (String) pipelineServiceClientConfiguration.getParameters().getAdditionalProperties().get("username");
        this.password = (String) pipelineServiceClientConfiguration.getParameters().getAdditionalProperties().get("password");
        this.serviceURL = validateServiceURL(pipelineServiceClientConfiguration.getApiEndpoint());
        SSLContext createAirflowSSLContext = createAirflowSSLContext(pipelineServiceClientConfiguration);
        HttpClient.Builder connectTimeout = HttpClient.newBuilder().version(HttpClient.Version.HTTP_1_1).connectTimeout(Duration.ofSeconds(((Integer) pipelineServiceClientConfiguration.getParameters().getAdditionalProperties().get(TIMEOUT_KEY)).intValue()));
        if (createAirflowSSLContext == null) {
            this.client = connectTimeout.build();
        } else {
            this.client = connectTimeout.sslContext(createAirflowSSLContext).build();
        }
    }

    private static SSLContext createAirflowSSLContext(PipelineServiceClientConfiguration pipelineServiceClientConfiguration) throws KeyStoreException {
        return SSLUtil.createSSLContext((String) pipelineServiceClientConfiguration.getParameters().getAdditionalProperties().get(TRUSTSTORE_PATH_KEY), (String) pipelineServiceClientConfiguration.getParameters().getAdditionalProperties().get(TRUSTSTORE_PASSWORD_KEY), PLATFORM);
    }

    public final HttpResponse<String> post(String str, String str2, boolean z) throws IOException, InterruptedException {
        HttpRequest.Builder POST = HttpRequest.newBuilder(URI.create(str)).header("Content-Type", "application/json").POST(HttpRequest.BodyPublishers.ofString(str2));
        if (z) {
            POST.header(JwtFilter.AUTHORIZATION_HEADER, getBasicAuthenticationHeader(this.username, this.password));
        }
        return this.client.send(POST.build(), HttpResponse.BodyHandlers.ofString());
    }

    public final HttpResponse<String> post(String str, String str2) throws IOException, InterruptedException {
        return post(str, str2, true);
    }

    public PipelineServiceClientResponse deployPipeline(IngestionPipeline ingestionPipeline, ServiceEntityInterface serviceEntityInterface) {
        try {
            HttpResponse<String> post = post(buildURI("deploy").build().toString(), JsonUtils.pojoToJson(ingestionPipeline));
            if (post.statusCode() != 200) {
                throw new PipelineServiceClientException(String.format("%s Failed to deploy Ingestion Pipeline due to airflow API returned %s and response %s", ingestionPipeline.getName(), Response.Status.fromStatusCode(post.statusCode()), post.body()));
            }
            ingestionPipeline.setDeployed(true);
            return getResponse(200, (String) post.body());
        } catch (IOException | URISyntaxException e) {
            throw IngestionPipelineDeploymentException.byMessage(ingestionPipeline.getName(), "DEPLOYMENT_ERROR", e.getMessage());
        } catch (InterruptedException e2) {
            Thread.currentThread().interrupt();
            throw IngestionPipelineDeploymentException.byMessage(ingestionPipeline.getName(), "DEPLOYMENT_ERROR", e2.getMessage());
        }
    }

    public PipelineServiceClientResponse deletePipeline(IngestionPipeline ingestionPipeline) {
        String name = ingestionPipeline.getName();
        try {
            URIBuilder buildURI = buildURI(SearchClient.DELETE);
            buildURI.addParameter(DAG_ID, name);
            HttpResponse<String> deleteRequestAuthenticatedForJsonContent = deleteRequestAuthenticatedForJsonContent(buildURI.build().toString());
            if (deleteRequestAuthenticatedForJsonContent.statusCode() == 200) {
                return getResponse(200, (String) deleteRequestAuthenticatedForJsonContent.body());
            }
        } catch (IOException | URISyntaxException e) {
            LOG.error(String.format("Failed to delete Airflow Pipeline %s from Airflow DAGS", name));
        } catch (InterruptedException e2) {
            Thread.currentThread().interrupt();
            LOG.error(String.format("Failed to delete Airflow Pipeline %s from Airflow DAGS", name));
        }
        return getResponse(500, String.format("Failed to delete Airflow Pipeline %s from Airflow DAGS", name));
    }

    public PipelineServiceClientResponse runPipeline(IngestionPipeline ingestionPipeline, ServiceEntityInterface serviceEntityInterface) {
        String name = ingestionPipeline.getName();
        try {
            String uri = buildURI("trigger").build().toString();
            JSONObject jSONObject = new JSONObject();
            jSONObject.put(DAG_ID, name);
            HttpResponse<String> post = post(uri, jSONObject.toString());
            if (post.statusCode() == 200) {
                return getResponse(200, (String) post.body());
            }
            throw IngestionPipelineDeploymentException.byMessage(name, "TRIGGER_ERROR", "Failed to trigger IngestionPipeline", Response.Status.fromStatusCode(post.statusCode()));
        } catch (IOException | URISyntaxException e) {
            throw IngestionPipelineDeploymentException.byMessage(name, "TRIGGER_ERROR", e.getMessage());
        } catch (InterruptedException e2) {
            Thread.currentThread().interrupt();
            throw IngestionPipelineDeploymentException.byMessage(name, "TRIGGER_ERROR", e2.getMessage());
        }
    }

    public PipelineServiceClientResponse toggleIngestion(IngestionPipeline ingestionPipeline) {
        HttpResponse<String> post;
        try {
            try {
                JSONObject jSONObject = new JSONObject();
                jSONObject.put(DAG_ID, ingestionPipeline.getName());
                if (ingestionPipeline.getEnabled().equals(Boolean.TRUE)) {
                    post = post(buildURI("disable").build().toString(), jSONObject.toString());
                    if (post.statusCode() == 200) {
                        ingestionPipeline.setEnabled(false);
                        return getResponse(200, (String) post.body());
                    }
                    if (post.statusCode() == 404) {
                        ingestionPipeline.setDeployed(false);
                        return getResponse(404, (String) post.body());
                    }
                } else {
                    post = post(buildURI("enable").build().toString(), jSONObject.toString());
                    if (post.statusCode() == 200) {
                        ingestionPipeline.setEnabled(true);
                        return getResponse(200, (String) post.body());
                    }
                    if (post.statusCode() == 404) {
                        ingestionPipeline.setDeployed(false);
                        return getResponse(404, (String) post.body());
                    }
                }
                throw clientException(ingestionPipeline, "Failed to toggle ingestion pipeline state", post);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw clientException(ingestionPipeline, e);
            }
        } catch (IOException | URISyntaxException e2) {
            throw clientException(ingestionPipeline, e2);
        }
    }

    public List<PipelineStatus> getQueuedPipelineStatusInternal(IngestionPipeline ingestionPipeline) {
        try {
            URIBuilder buildURI = buildURI("status");
            buildURI.addParameter(DAG_ID, ingestionPipeline.getName());
            buildURI.addParameter("only_queued", "true");
            HttpResponse<String> requestAuthenticatedForJsonContent = getRequestAuthenticatedForJsonContent(buildURI.build().toString());
            if (requestAuthenticatedForJsonContent.statusCode() == 200) {
                return JsonUtils.readObjects((String) requestAuthenticatedForJsonContent.body(), PipelineStatus.class);
            }
            LOG.error(String.format("Got status code [%s] trying to get queued statuses: [%s]", Integer.valueOf(requestAuthenticatedForJsonContent.statusCode()), requestAuthenticatedForJsonContent.body()));
            return new ArrayList();
        } catch (IOException | URISyntaxException e) {
            throw clientException(ingestionPipeline, e);
        } catch (InterruptedException e2) {
            Thread.currentThread().interrupt();
            throw clientException(ingestionPipeline, e2);
        }
    }

    @Override // org.openmetadata.service.clients.pipeline.PipelineServiceClient
    public PipelineServiceClientResponse getServiceStatusInternal() {
        try {
            try {
                HttpResponse<String> requestAuthenticatedForJsonContent = getRequestAuthenticatedForJsonContent(buildURI("health-auth").build().toString());
                if (requestAuthenticatedForJsonContent.statusCode() != 200) {
                    return (requestAuthenticatedForJsonContent.statusCode() == 401 || requestAuthenticatedForJsonContent.statusCode() == 403) ? buildUnhealthyStatus(String.format("Authentication failed for user [%s] trying to access the Airflow APIs.", this.username)) : requestAuthenticatedForJsonContent.statusCode() == 404 ? buildUnhealthyStatus(String.format("Airflow APIs not found. Please validate if the OpenMetadata Airflow plugin is installed correctly. %s", DOCS_LINK)) : buildUnhealthyStatus(String.format("Unexpected status response: code [%s] - [%s]", Integer.valueOf(requestAuthenticatedForJsonContent.statusCode()), requestAuthenticatedForJsonContent.body()));
                }
                String string = new JSONObject((String) requestAuthenticatedForJsonContent.body()).getString("version");
                return Boolean.TRUE.equals(validServerClientVersions(string)) ? buildHealthyStatus(string) : buildUnhealthyStatus(buildVersionMismatchErrorMessage(string, SERVER_VERSION));
            } catch (IOException | URISyntaxException e) {
                return buildUnhealthyStatus(String.format("%s %s", e.getMessage() != null ? String.format("Failed to get Airflow status due to [%s].", e.getMessage()) : String.format("Failed to connect to Airflow due to %s. Is the host available at %s?", e.getCause().toString(), this.serviceURL.toString()), DOCS_LINK));
            }
        } catch (InterruptedException e2) {
            Thread.currentThread().interrupt();
            return buildUnhealthyStatus(String.format("Failed to connect to Airflow due to %s. Is the host available at %s? %s.", e2.getMessage(), this.serviceURL.toString(), DOCS_LINK));
        }
    }

    public PipelineServiceClientResponse runAutomationsWorkflow(Workflow workflow) {
        try {
            HttpResponse<String> post = post(buildURI("run_automation").build().toString(), JsonUtils.pojoToJson(workflow));
            if (post.statusCode() == 200) {
                return getResponse(200, (String) post.body());
            }
            throw new PipelineServiceClientException(String.format("%s Failed to trigger workflow due to airflow API returned %s and response %s", workflow.getName(), Response.Status.fromStatusCode(post.statusCode()), post.body()));
        } catch (IOException | URISyntaxException e) {
            throw IngestionPipelineDeploymentException.byMessage(workflow.getName(), "TRIGGER_ERROR", "No response from the test connection. Make sure your service is reachable and accepting connections");
        } catch (InterruptedException e2) {
            Thread.currentThread().interrupt();
            throw IngestionPipelineDeploymentException.byMessage(workflow.getName(), "TRIGGER_ERROR", e2.getMessage());
        }
    }

    public PipelineServiceClientResponse runApplicationFlow(App app) {
        return sendPost("run_application", app);
    }

    public PipelineServiceClientResponse validateAppRegistration(AppMarketPlaceDefinition appMarketPlaceDefinition) {
        return getResponse(200, MigrationWorkflow.SUCCESS_MSG);
    }

    private PipelineServiceClientResponse sendPost(String str, Object obj) {
        String pojoToJson = JsonUtils.pojoToJson(obj);
        try {
            HttpResponse<String> post = post(buildURI(str).build().toString(), pojoToJson);
            if (post.statusCode() == 200) {
                return getResponse(200, (String) post.body());
            }
            throw new PipelineServiceClientException(String.format("%s Failed to trigger flow due to airflow API returned %s and response %s", pojoToJson, Response.Status.fromStatusCode(post.statusCode()), post.body()));
        } catch (IOException | URISyntaxException e) {
            throw IngestionPipelineDeploymentException.byMessage(pojoToJson, "DEPLOYMENT_ERROR", e.getMessage());
        } catch (InterruptedException e2) {
            Thread.currentThread().interrupt();
            throw IngestionPipelineDeploymentException.byMessage(pojoToJson, "DEPLOYMENT_ERROR", e2.getMessage());
        }
    }

    public PipelineServiceClientResponse killIngestion(IngestionPipeline ingestionPipeline) {
        try {
            String uri = buildURI("kill").build().toString();
            JSONObject jSONObject = new JSONObject();
            jSONObject.put(DAG_ID, ingestionPipeline.getName());
            HttpResponse<String> post = post(uri, jSONObject.toString());
            if (post.statusCode() == 200) {
                return getResponse(200, (String) post.body());
            }
            throw new PipelineServiceClientException(String.format("Failed to kill running workflows due to %s", post.body()));
        } catch (IOException | URISyntaxException e) {
            throw clientException("Failed to kill running workflows", e);
        } catch (InterruptedException e2) {
            Thread.currentThread().interrupt();
            throw clientException("Failed to kill running workflows", e2);
        }
    }

    @Override // org.openmetadata.service.clients.pipeline.PipelineServiceClient
    public Map<String, String> requestGetHostIp() {
        try {
            HttpResponse<String> requestAuthenticatedForJsonContent = getRequestAuthenticatedForJsonContent(buildURI("ip").build().toString());
            if (requestAuthenticatedForJsonContent.statusCode() == 200) {
                return (Map) JsonUtils.readValue((String) requestAuthenticatedForJsonContent.body(), new TypeReference<Map<String, String>>() { // from class: org.openmetadata.service.clients.pipeline.airflow.AirflowRESTClient.1
                });
            }
            throw new PipelineServiceClientException(String.format("Failed to get Pipeline Service host IP due to %s", requestAuthenticatedForJsonContent.body()));
        } catch (IOException | URISyntaxException e) {
            throw clientException("Failed to get Pipeline Service host IP.", e);
        } catch (InterruptedException e2) {
            Thread.currentThread().interrupt();
            throw clientException("Failed to get Pipeline Service host IP.", e2);
        }
    }

    public Map<String, String> getLastIngestionLogs(IngestionPipeline ingestionPipeline, String str) {
        String str2 = (String) TYPE_TO_TASK.get(ingestionPipeline.getPipelineType().toString());
        URIBuilder buildURI = buildURI("last_dag_logs");
        if (str != null) {
            buildURI.addParameter("after", str);
        }
        buildURI.addParameter(DAG_ID, ingestionPipeline.getName());
        buildURI.addParameter("task_id", str2);
        try {
            HttpResponse<String> requestAuthenticatedForJsonContent = getRequestAuthenticatedForJsonContent(buildURI.build().toString());
            if (requestAuthenticatedForJsonContent.statusCode() == 200) {
                return (Map) JsonUtils.readValue((String) requestAuthenticatedForJsonContent.body(), new TypeReference<Map<String, String>>() { // from class: org.openmetadata.service.clients.pipeline.airflow.AirflowRESTClient.2
                });
            }
            throw new PipelineServiceClientException(String.format("Failed to get last ingestion logs due to %s", requestAuthenticatedForJsonContent.body()));
        } catch (IOException | URISyntaxException e) {
            throw clientException("Failed to get last ingestion logs.", e);
        } catch (InterruptedException e2) {
            Thread.currentThread().interrupt();
            throw clientException("Failed to get last ingestion logs.", e2);
        }
    }

    public URIBuilder buildURI(String str) {
        try {
            ArrayList arrayList = new ArrayList(API_ENDPOINT_SEGMENTS);
            arrayList.add(str);
            URIBuilder uRIBuilder = new URIBuilder(String.valueOf(this.serviceURL));
            ArrayList arrayList2 = new ArrayList(uRIBuilder.getPathSegments());
            arrayList2.addAll(arrayList);
            return uRIBuilder.setPathSegments(arrayList2);
        } catch (Exception e) {
            throw clientException(String.format("Failed to built request URI for path [%s].", str), e);
        }
    }

    private HttpResponse<String> getRequestAuthenticatedForJsonContent(String str) throws IOException, InterruptedException {
        return this.client.send(authenticatedRequestBuilder(str).GET().build(), HttpResponse.BodyHandlers.ofString());
    }

    private HttpResponse<String> deleteRequestAuthenticatedForJsonContent(String str) throws IOException, InterruptedException {
        return this.client.send(authenticatedRequestBuilder(str).DELETE().build(), HttpResponse.BodyHandlers.ofString());
    }

    private HttpRequest.Builder authenticatedRequestBuilder(String str) {
        return HttpRequest.newBuilder(URI.create(str)).header("Content-Type", "application/json").header(JwtFilter.AUTHORIZATION_HEADER, getBasicAuthenticationHeader(this.username, this.password));
    }

    private PipelineServiceClientResponse getResponse(int i, String str) {
        return new PipelineServiceClientResponse().withCode(Integer.valueOf(i)).withReason(str).withPlatform(getPlatform());
    }

    private PipelineServiceClientException clientException(String str, Exception exc) {
        return PipelineServiceClientException.byMessage(str, exc.getMessage());
    }

    private PipelineServiceClientException clientException(IngestionPipeline ingestionPipeline, Exception exc) {
        return clientException(ingestionPipeline.getName(), exc);
    }

    private PipelineServiceClientException clientException(IngestionPipeline ingestionPipeline, String str, HttpResponse<String> httpResponse) {
        return PipelineServiceClientException.byMessage(ingestionPipeline.getName(), str, Response.Status.fromStatusCode(httpResponse.statusCode()));
    }
}
