package org.openmetadata.service.clients.pipeline.airflow;

import com.fasterxml.jackson.core.type.TypeReference;
import java.io.IOException;
import java.net.URI;
import java.net.URL;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.security.KeyStoreException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import javax.net.ssl.SSLContext;
import javax.ws.rs.core.Response;
import org.json.JSONObject;
import org.openmetadata.schema.ServiceEntityInterface;
import org.openmetadata.schema.api.configuration.pipelineServiceClient.PipelineServiceClientConfiguration;
import org.openmetadata.schema.entity.automations.Workflow;
import org.openmetadata.schema.entity.services.ingestionPipelines.IngestionPipeline;
import org.openmetadata.schema.entity.services.ingestionPipelines.PipelineServiceClientResponse;
import org.openmetadata.schema.entity.services.ingestionPipelines.PipelineStatus;
import org.openmetadata.sdk.PipelineServiceClient;
import org.openmetadata.sdk.exception.PipelineServiceClientException;
import org.openmetadata.service.exception.IngestionPipelineDeploymentException;
import org.openmetadata.service.security.JwtFilter;
import org.openmetadata.service.security.auth.BotTokenCache;
import org.openmetadata.service.util.JsonUtils;
import org.openmetadata.service.util.SSLUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/openmetadata/service/clients/pipeline/airflow/AirflowRESTClient.class */
public class AirflowRESTClient extends PipelineServiceClient {
    private static final Logger LOG = LoggerFactory.getLogger(AirflowRESTClient.class);
    private static final String PLATFORM = "Airflow";
    private static final String USERNAME_KEY = "username";
    private static final String PASSWORD_KEY = "password";
    private static final String TIMEOUT_KEY = "timeout";
    private static final String TRUSTSTORE_PATH_KEY = "truststorePath";
    private static final String TRUSTSTORE_PASSWORD_KEY = "truststorePassword";
    protected final String username;
    protected final String password;
    protected final HttpClient client;
    protected final URL serviceURL;
    private static final String API_ENDPOINT = "api/v1/openmetadata";
    private static final String DAG_ID = "dag_id";

    public AirflowRESTClient(PipelineServiceClientConfiguration pipelineServiceClientConfiguration) throws KeyStoreException {
        super(pipelineServiceClientConfiguration);
        setPlatform(PLATFORM);
        this.username = (String) pipelineServiceClientConfiguration.getParameters().getAdditionalProperties().get(USERNAME_KEY);
        this.password = (String) pipelineServiceClientConfiguration.getParameters().getAdditionalProperties().get("password");
        this.serviceURL = validateServiceURL(pipelineServiceClientConfiguration.getApiEndpoint());
        SSLContext createAirflowSSLContext = createAirflowSSLContext(pipelineServiceClientConfiguration);
        HttpClient.Builder connectTimeout = HttpClient.newBuilder().version(HttpClient.Version.HTTP_1_1).connectTimeout(Duration.ofSeconds(((Integer) pipelineServiceClientConfiguration.getParameters().getAdditionalProperties().get(TIMEOUT_KEY)).intValue()));
        if (createAirflowSSLContext == null) {
            this.client = connectTimeout.build();
        } else {
            this.client = connectTimeout.sslContext(createAirflowSSLContext).build();
        }
    }

    private static SSLContext createAirflowSSLContext(PipelineServiceClientConfiguration pipelineServiceClientConfiguration) throws KeyStoreException {
        return SSLUtil.createSSLContext((String) pipelineServiceClientConfiguration.getParameters().getAdditionalProperties().get(TRUSTSTORE_PATH_KEY), (String) pipelineServiceClientConfiguration.getParameters().getAdditionalProperties().get(TRUSTSTORE_PASSWORD_KEY), PLATFORM);
    }

    public final HttpResponse<String> post(String str, String str2, boolean z) throws IOException, InterruptedException {
        HttpRequest.Builder POST = HttpRequest.newBuilder(URI.create(str)).header("Content-Type", "application/json").POST(HttpRequest.BodyPublishers.ofString(str2));
        if (z) {
            POST.header(JwtFilter.AUTHORIZATION_HEADER, getBasicAuthenticationHeader(this.username, this.password));
        }
        return this.client.send(POST.build(), HttpResponse.BodyHandlers.ofString());
    }

    public final HttpResponse<String> post(String str, String str2) throws IOException, InterruptedException {
        return post(str, str2, true);
    }

    public PipelineServiceClientResponse deployPipeline(IngestionPipeline ingestionPipeline, ServiceEntityInterface serviceEntityInterface) {
        try {
            HttpResponse<String> post = post(String.format("%s/%s/deploy", this.serviceURL, API_ENDPOINT), JsonUtils.pojoToJson(ingestionPipeline));
            if (post.statusCode() != 200) {
                throw new PipelineServiceClientException(String.format("%s Failed to deploy Ingestion Pipeline due to airflow API returned %s and response %s", ingestionPipeline.getName(), Response.Status.fromStatusCode(post.statusCode()), post.body()));
            }
            ingestionPipeline.setDeployed(true);
            return new PipelineServiceClientResponse().withCode(200).withReason((String) post.body()).withPlatform(getPlatform());
        } catch (Exception e) {
            throw IngestionPipelineDeploymentException.byMessage(ingestionPipeline.getName(), e.getMessage());
        }
    }

    public PipelineServiceClientResponse deletePipeline(IngestionPipeline ingestionPipeline) {
        String name = ingestionPipeline.getName();
        try {
            if (deleteRequestAuthenticatedForJsonContent("%s/%s/delete?dag_id=%s", this.serviceURL, API_ENDPOINT, name).statusCode() == 200) {
                return new PipelineServiceClientResponse().withCode(200).withPlatform(getPlatform());
            }
        } catch (Exception e) {
            LOG.error(String.format("Failed to delete Airflow Pipeline %s from Airflow DAGS", name));
        }
        return new PipelineServiceClientResponse().withCode(500).withReason(String.format("Failed to delete Airflow Pipeline %s from Airflow DAGS", name)).withPlatform(getPlatform());
    }

    public PipelineServiceClientResponse runPipeline(IngestionPipeline ingestionPipeline, ServiceEntityInterface serviceEntityInterface) {
        String name = ingestionPipeline.getName();
        try {
            String format = String.format("%s/%s/trigger", this.serviceURL, API_ENDPOINT);
            JSONObject jSONObject = new JSONObject();
            jSONObject.put(DAG_ID, name);
            HttpResponse<String> post = post(format, jSONObject.toString());
            if (post.statusCode() == 200) {
                return new PipelineServiceClientResponse().withCode(200).withReason((String) post.body()).withPlatform(getPlatform());
            }
            throw IngestionPipelineDeploymentException.byMessage(name, "Failed to trigger IngestionPipeline", Response.Status.fromStatusCode(post.statusCode()));
        } catch (Exception e) {
            throw IngestionPipelineDeploymentException.byMessage(name, e.getMessage());
        }
    }

    public PipelineServiceClientResponse toggleIngestion(IngestionPipeline ingestionPipeline) {
        HttpResponse<String> post;
        try {
            JSONObject jSONObject = new JSONObject();
            jSONObject.put(DAG_ID, ingestionPipeline.getName());
            if (ingestionPipeline.getEnabled().equals(Boolean.TRUE)) {
                post = post(String.format("%s/%s/disable", this.serviceURL, API_ENDPOINT), jSONObject.toString());
                if (post.statusCode() == 200) {
                    ingestionPipeline.setEnabled(false);
                    return new PipelineServiceClientResponse().withCode(200).withReason((String) post.body()).withPlatform(getPlatform());
                }
                if (post.statusCode() == 404) {
                    ingestionPipeline.setDeployed(false);
                    return new PipelineServiceClientResponse().withCode(404).withReason((String) post.body()).withPlatform(getPlatform());
                }
            } else {
                post = post(String.format("%s/%s/enable", this.serviceURL, API_ENDPOINT), jSONObject.toString());
                if (post.statusCode() == 200) {
                    ingestionPipeline.setEnabled(true);
                    return new PipelineServiceClientResponse().withCode(200).withReason((String) post.body()).withPlatform(getPlatform());
                }
                if (post.statusCode() == 404) {
                    ingestionPipeline.setDeployed(false);
                    return new PipelineServiceClientResponse().withCode(404).withReason((String) post.body()).withPlatform(getPlatform());
                }
            }
            throw PipelineServiceClientException.byMessage(ingestionPipeline.getName(), "Failed to toggle ingestion pipeline state", Response.Status.fromStatusCode(post.statusCode()));
        } catch (Exception e) {
            throw PipelineServiceClientException.byMessage(ingestionPipeline.getName(), e.getMessage());
        }
    }

    public List<PipelineStatus> getQueuedPipelineStatus(IngestionPipeline ingestionPipeline) {
        try {
            HttpResponse<String> requestAuthenticatedForJsonContent = getRequestAuthenticatedForJsonContent("%s/%s/status?dag_id=%s&only_queued=true", this.serviceURL, API_ENDPOINT, ingestionPipeline.getName());
            if (requestAuthenticatedForJsonContent.statusCode() == 200) {
                return JsonUtils.readObjects((String) requestAuthenticatedForJsonContent.body(), PipelineStatus.class);
            }
            LOG.error(String.format("Got status code [%s] trying to get queued statuses: [%s]", Integer.valueOf(requestAuthenticatedForJsonContent.statusCode()), requestAuthenticatedForJsonContent.body()));
            return new ArrayList();
        } catch (Exception e) {
            throw PipelineServiceClientException.byMessage(ingestionPipeline.getName(), e.getMessage());
        }
    }

    public PipelineServiceClientResponse getServiceStatus() {
        try {
            HttpResponse<String> requestAuthenticatedForJsonContent = getRequestAuthenticatedForJsonContent("%s/%s/health-auth", this.serviceURL, API_ENDPOINT);
            if (requestAuthenticatedForJsonContent.statusCode() == 200) {
                String string = new JSONObject((String) requestAuthenticatedForJsonContent.body()).getString("version");
                return Boolean.TRUE.equals(validServerClientVersions(string)) ? buildHealthyStatus(string) : buildUnhealthyStatus(buildVersionMismatchErrorMessage(string, SERVER_VERSION));
            }
            if (requestAuthenticatedForJsonContent.statusCode() == 401 || requestAuthenticatedForJsonContent.statusCode() == 403) {
                return buildUnhealthyStatus(String.format("Authentication failed for user [%s] trying to access the Airflow APIs.", this.username));
            }
            if (requestAuthenticatedForJsonContent.statusCode() == 404) {
                return buildUnhealthyStatus("Airflow APIs not found. Please follow the installation guide.");
            }
            throw new PipelineServiceClientException(String.format("Failed to get REST status due to %s.", requestAuthenticatedForJsonContent.body()));
        } catch (Exception e) {
            return buildUnhealthyStatus(String.format("Failed to get REST status due to [%s].", e.getMessage()));
        }
    }

    public PipelineServiceClientResponse runAutomationsWorkflow(Workflow workflow) {
        try {
            HttpResponse<String> post = post(String.format("%s/%s/run_automation", this.serviceURL, API_ENDPOINT), JsonUtils.pojoToJson(workflow));
            if (post.statusCode() == 200) {
                return new PipelineServiceClientResponse().withCode(200).withReason((String) post.body()).withPlatform(getPlatform());
            }
            throw new PipelineServiceClientException(String.format("%s Failed to trigger workflow due to airflow API returned %s and response %s", workflow.getName(), Response.Status.fromStatusCode(post.statusCode()), post.body()));
        } catch (Exception e) {
            throw IngestionPipelineDeploymentException.byMessage(workflow.getName(), e.getMessage());
        }
    }

    public PipelineServiceClientResponse killIngestion(IngestionPipeline ingestionPipeline) {
        try {
            String format = String.format("%s/%s/kill", this.serviceURL, API_ENDPOINT);
            JSONObject jSONObject = new JSONObject();
            jSONObject.put(DAG_ID, ingestionPipeline.getName());
            HttpResponse<String> post = post(format, jSONObject.toString());
            if (post.statusCode() == 200) {
                return new PipelineServiceClientResponse().withCode(200).withReason((String) post.body()).withPlatform(getPlatform());
            }
            throw new PipelineServiceClientException(String.format("Failed to kill running workflows due to %s", post.body()));
        } catch (Exception e) {
            throw PipelineServiceClientException.byMessage("Failed to kill running workflows", e.getMessage());
        }
    }

    public Map<String, String> requestGetHostIp() {
        try {
            HttpResponse<String> requestAuthenticatedForJsonContent = getRequestAuthenticatedForJsonContent("%s/%s/ip", this.serviceURL, API_ENDPOINT);
            if (requestAuthenticatedForJsonContent.statusCode() == 200) {
                return (Map) JsonUtils.readValue((String) requestAuthenticatedForJsonContent.body(), new TypeReference<Map<String, String>>() { // from class: org.openmetadata.service.clients.pipeline.airflow.AirflowRESTClient.1
                });
            }
            throw new PipelineServiceClientException(String.format("Failed to get Pipeline Service host IP due to %s", requestAuthenticatedForJsonContent.body()));
        } catch (Exception e) {
            throw PipelineServiceClientException.byMessage("Failed to get Pipeline Service host IP.", e.getMessage());
        }
    }

    public Map<String, String> getLastIngestionLogs(IngestionPipeline ingestionPipeline, String str) {
        String str2 = (String) TYPE_TO_TASK.get(ingestionPipeline.getPipelineType().toString());
        String str3 = BotTokenCache.EMPTY_STRING;
        if (str != null) {
            str3 = String.format("&after=%s", str);
        }
        try {
            HttpResponse<String> requestAuthenticatedForJsonContent = getRequestAuthenticatedForJsonContent("%s/%s/last_dag_logs?dag_id=%s&task_id=%s%s", this.serviceURL, API_ENDPOINT, ingestionPipeline.getName(), str2, str3);
            if (requestAuthenticatedForJsonContent.statusCode() == 200) {
                return (Map) JsonUtils.readValue((String) requestAuthenticatedForJsonContent.body(), new TypeReference<Map<String, String>>() { // from class: org.openmetadata.service.clients.pipeline.airflow.AirflowRESTClient.2
                });
            }
            throw new PipelineServiceClientException(String.format("Failed to get last ingestion logs due to %s", requestAuthenticatedForJsonContent.body()));
        } catch (Exception e) {
            throw PipelineServiceClientException.byMessage("Failed to get last ingestion logs.", e.getMessage());
        }
    }

    private HttpResponse<String> getRequestAuthenticatedForJsonContent(String str, Object... objArr) throws IOException, InterruptedException {
        return this.client.send(authenticatedRequestBuilder(str, objArr).GET().build(), HttpResponse.BodyHandlers.ofString());
    }

    private HttpResponse<String> deleteRequestAuthenticatedForJsonContent(String str, Object... objArr) throws IOException, InterruptedException {
        return this.client.send(authenticatedRequestBuilder(str, objArr).DELETE().build(), HttpResponse.BodyHandlers.ofString());
    }

    private HttpRequest.Builder authenticatedRequestBuilder(String str, Object... objArr) {
        return HttpRequest.newBuilder(URI.create(String.format(str, objArr))).header("Content-Type", "application/json").header(JwtFilter.AUTHORIZATION_HEADER, getBasicAuthenticationHeader(this.username, this.password));
    }
}
