/*
 * Decompiled with CFR 0.152.
 */
package org.openmetadata.service.clients.pipeline.airflow;

import com.fasterxml.jackson.core.type.TypeReference;
import java.io.IOException;
import java.net.URI;
import java.net.URL;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.security.KeyStoreException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import javax.net.ssl.SSLContext;
import javax.ws.rs.core.Response;
import org.json.JSONObject;
import org.openmetadata.schema.ServiceEntityInterface;
import org.openmetadata.schema.api.configuration.pipelineServiceClient.PipelineServiceClientConfiguration;
import org.openmetadata.schema.entity.app.App;
import org.openmetadata.schema.entity.app.AppMarketPlaceDefinition;
import org.openmetadata.schema.entity.automations.Workflow;
import org.openmetadata.schema.entity.services.ingestionPipelines.IngestionPipeline;
import org.openmetadata.schema.entity.services.ingestionPipelines.PipelineServiceClientResponse;
import org.openmetadata.schema.entity.services.ingestionPipelines.PipelineStatus;
import org.openmetadata.sdk.PipelineServiceClient;
import org.openmetadata.sdk.exception.PipelineServiceClientException;
import org.openmetadata.service.exception.IngestionPipelineDeploymentException;
import org.openmetadata.service.util.JsonUtils;
import org.openmetadata.service.util.SSLUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AirflowRESTClient
extends PipelineServiceClient {
    private static final Logger LOG = LoggerFactory.getLogger(AirflowRESTClient.class);
    private static final String PLATFORM = "Airflow";
    private static final String USERNAME_KEY = "username";
    private static final String PASSWORD_KEY = "password";
    private static final String TIMEOUT_KEY = "timeout";
    private static final String TRUSTSTORE_PATH_KEY = "truststorePath";
    private static final String TRUSTSTORE_PASSWORD_KEY = "truststorePassword";
    protected final String username;
    protected final String password;
    protected final HttpClient client;
    protected final URL serviceURL;
    private static final String API_ENDPOINT = "api/v1/openmetadata";
    private static final String DAG_ID = "dag_id";

    public AirflowRESTClient(PipelineServiceClientConfiguration config) throws KeyStoreException {
        super(config);
        this.setPlatform(PLATFORM);
        this.username = (String)config.getParameters().getAdditionalProperties().get(USERNAME_KEY);
        this.password = (String)config.getParameters().getAdditionalProperties().get(PASSWORD_KEY);
        this.serviceURL = this.validateServiceURL(config.getApiEndpoint());
        SSLContext sslContext = AirflowRESTClient.createAirflowSSLContext(config);
        HttpClient.Builder clientBuilder = HttpClient.newBuilder().version(HttpClient.Version.HTTP_1_1).connectTimeout(Duration.ofSeconds(((Integer)config.getParameters().getAdditionalProperties().get(TIMEOUT_KEY)).intValue()));
        this.client = sslContext == null ? clientBuilder.build() : clientBuilder.sslContext(sslContext).build();
    }

    private static SSLContext createAirflowSSLContext(PipelineServiceClientConfiguration config) throws KeyStoreException {
        String truststorePath = (String)config.getParameters().getAdditionalProperties().get(TRUSTSTORE_PATH_KEY);
        String truststorePassword = (String)config.getParameters().getAdditionalProperties().get(TRUSTSTORE_PASSWORD_KEY);
        return SSLUtil.createSSLContext(truststorePath, truststorePassword, PLATFORM);
    }

    public final HttpResponse<String> post(String endpoint, String payload, boolean authenticate) throws IOException, InterruptedException {
        HttpRequest.Builder requestBuilder = HttpRequest.newBuilder(URI.create(endpoint)).header("Content-Type", "application/json").POST(HttpRequest.BodyPublishers.ofString(payload));
        if (authenticate) {
            requestBuilder.header("Authorization", this.getBasicAuthenticationHeader(this.username, this.password));
        }
        return this.client.send(requestBuilder.build(), HttpResponse.BodyHandlers.ofString());
    }

    public final HttpResponse<String> post(String endpoint, String payload) throws IOException, InterruptedException {
        return this.post(endpoint, payload, true);
    }

    public PipelineServiceClientResponse deployPipeline(IngestionPipeline ingestionPipeline, ServiceEntityInterface service) {
        HttpResponse<String> response;
        try {
            String deployEndpoint = "%s/%s/deploy";
            String deployUrl = String.format(deployEndpoint, this.serviceURL, API_ENDPOINT);
            String pipelinePayload = JsonUtils.pojoToJson(ingestionPipeline);
            response = this.post(deployUrl, pipelinePayload);
            if (response.statusCode() == 200) {
                ingestionPipeline.setDeployed(Boolean.valueOf(true));
                return new PipelineServiceClientResponse().withCode(Integer.valueOf(200)).withReason(response.body()).withPlatform(this.getPlatform());
            }
        }
        catch (Exception e) {
            throw IngestionPipelineDeploymentException.byMessage(ingestionPipeline.getName(), e.getMessage());
        }
        throw new PipelineServiceClientException(String.format("%s Failed to deploy Ingestion Pipeline due to airflow API returned %s and response %s", ingestionPipeline.getName(), Response.Status.fromStatusCode((int)response.statusCode()), response.body()));
    }

    public PipelineServiceClientResponse deletePipeline(IngestionPipeline ingestionPipeline) {
        String pipelineName = ingestionPipeline.getName();
        try {
            String deleteEndpoint = "%s/%s/delete?dag_id=%s";
            HttpResponse<String> response = this.deleteRequestAuthenticatedForJsonContent(deleteEndpoint, this.serviceURL, API_ENDPOINT, pipelineName);
            if (response.statusCode() == 200) {
                return new PipelineServiceClientResponse().withCode(Integer.valueOf(200)).withPlatform(this.getPlatform());
            }
        }
        catch (Exception e) {
            LOG.error(String.format("Failed to delete Airflow Pipeline %s from Airflow DAGS", pipelineName));
        }
        return new PipelineServiceClientResponse().withCode(Integer.valueOf(500)).withReason(String.format("Failed to delete Airflow Pipeline %s from Airflow DAGS", pipelineName)).withPlatform(this.getPlatform());
    }

    public PipelineServiceClientResponse runPipeline(IngestionPipeline ingestionPipeline, ServiceEntityInterface service) {
        HttpResponse<String> response;
        String pipelineName = ingestionPipeline.getName();
        try {
            String triggerEndPoint = "%s/%s/trigger";
            String triggerUrl = String.format(triggerEndPoint, this.serviceURL, API_ENDPOINT);
            JSONObject requestPayload = new JSONObject();
            requestPayload.put(DAG_ID, (Object)pipelineName);
            response = this.post(triggerUrl, requestPayload.toString());
            if (response.statusCode() == 200) {
                return new PipelineServiceClientResponse().withCode(Integer.valueOf(200)).withReason(response.body()).withPlatform(this.getPlatform());
            }
        }
        catch (Exception e) {
            throw IngestionPipelineDeploymentException.byMessage(pipelineName, e.getMessage());
        }
        throw IngestionPipelineDeploymentException.byMessage(pipelineName, "Failed to trigger IngestionPipeline", Response.Status.fromStatusCode((int)response.statusCode()));
    }

    public PipelineServiceClientResponse toggleIngestion(IngestionPipeline ingestionPipeline) {
        HttpResponse<String> response;
        try {
            JSONObject requestPayload = new JSONObject();
            requestPayload.put(DAG_ID, (Object)ingestionPipeline.getName());
            if (ingestionPipeline.getEnabled().equals(Boolean.TRUE)) {
                String toggleEndPoint = "%s/%s/disable";
                String toggleUrl = String.format(toggleEndPoint, this.serviceURL, API_ENDPOINT);
                response = this.post(toggleUrl, requestPayload.toString());
                if (response.statusCode() == 200) {
                    ingestionPipeline.setEnabled(Boolean.valueOf(false));
                    return new PipelineServiceClientResponse().withCode(Integer.valueOf(200)).withReason(response.body()).withPlatform(this.getPlatform());
                }
                if (response.statusCode() == 404) {
                    ingestionPipeline.setDeployed(Boolean.valueOf(false));
                    return new PipelineServiceClientResponse().withCode(Integer.valueOf(404)).withReason(response.body()).withPlatform(this.getPlatform());
                }
            } else {
                String toggleEndPoint = "%s/%s/enable";
                String toggleUrl = String.format(toggleEndPoint, this.serviceURL, API_ENDPOINT);
                response = this.post(toggleUrl, requestPayload.toString());
                if (response.statusCode() == 200) {
                    ingestionPipeline.setEnabled(Boolean.valueOf(true));
                    return new PipelineServiceClientResponse().withCode(Integer.valueOf(200)).withReason(response.body()).withPlatform(this.getPlatform());
                }
                if (response.statusCode() == 404) {
                    ingestionPipeline.setDeployed(Boolean.valueOf(false));
                    return new PipelineServiceClientResponse().withCode(Integer.valueOf(404)).withReason(response.body()).withPlatform(this.getPlatform());
                }
            }
        }
        catch (Exception e) {
            throw PipelineServiceClientException.byMessage((String)ingestionPipeline.getName(), (String)e.getMessage());
        }
        throw PipelineServiceClientException.byMessage((String)ingestionPipeline.getName(), (String)"Failed to toggle ingestion pipeline state", (Response.Status)Response.Status.fromStatusCode((int)response.statusCode()));
    }

    public List<PipelineStatus> getQueuedPipelineStatusInternal(IngestionPipeline ingestionPipeline) {
        HttpResponse<String> response;
        try {
            String statusEndPoint = "%s/%s/status?dag_id=%s&only_queued=true";
            response = this.getRequestAuthenticatedForJsonContent(statusEndPoint, this.serviceURL, API_ENDPOINT, ingestionPipeline.getName());
            if (response.statusCode() == 200) {
                return JsonUtils.readObjects(response.body(), PipelineStatus.class);
            }
        }
        catch (Exception e) {
            throw PipelineServiceClientException.byMessage((String)ingestionPipeline.getName(), (String)e.getMessage());
        }
        LOG.error(String.format("Got status code [%s] trying to get queued statuses: [%s]", response.statusCode(), response.body()));
        return new ArrayList<PipelineStatus>();
    }

    public PipelineServiceClientResponse getServiceStatusInternal() {
        try {
            HttpResponse<String> response = this.getRequestAuthenticatedForJsonContent("%s/%s/health-auth", this.serviceURL, API_ENDPOINT);
            if (response.statusCode() == 200) {
                JSONObject responseJSON = new JSONObject(response.body());
                String ingestionVersion = responseJSON.getString("version");
                if (Boolean.TRUE.equals(this.validServerClientVersions(ingestionVersion))) {
                    return this.buildHealthyStatus(ingestionVersion);
                }
                return this.buildUnhealthyStatus(this.buildVersionMismatchErrorMessage(ingestionVersion, SERVER_VERSION));
            }
            if (response.statusCode() == 401 || response.statusCode() == 403) {
                return this.buildUnhealthyStatus(String.format("Authentication failed for user [%s] trying to access the Airflow APIs.", this.username));
            }
            if (response.statusCode() == 404) {
                return this.buildUnhealthyStatus("Airflow APIs not found. Please follow the installation guide.");
            }
            return this.buildUnhealthyStatus(String.format("Unexpected status response: code [%s] - [%s]", response.statusCode(), response.body()));
        }
        catch (Exception e) {
            return this.buildUnhealthyStatus(String.format("Failed to get REST status due to [%s].", e.getMessage()));
        }
    }

    public PipelineServiceClientResponse runAutomationsWorkflow(Workflow workflow) {
        HttpResponse<String> response;
        try {
            String automationsEndpoint = "%s/%s/run_automation";
            String automationsUrl = String.format(automationsEndpoint, this.serviceURL, API_ENDPOINT);
            String workflowPayload = JsonUtils.pojoToJson(workflow);
            response = this.post(automationsUrl, workflowPayload);
            if (response.statusCode() == 200) {
                return new PipelineServiceClientResponse().withCode(Integer.valueOf(200)).withReason(response.body()).withPlatform(this.getPlatform());
            }
        }
        catch (Exception e) {
            throw IngestionPipelineDeploymentException.byMessage(workflow.getName(), e.getMessage());
        }
        throw new PipelineServiceClientException(String.format("%s Failed to trigger workflow due to airflow API returned %s and response %s", workflow.getName(), Response.Status.fromStatusCode((int)response.statusCode()), response.body()));
    }

    public PipelineServiceClientResponse runApplicationFlow(App application) {
        return this.sendPost("run_application", application);
    }

    public PipelineServiceClientResponse validateAppRegistration(AppMarketPlaceDefinition appMarketPlaceDefinition) {
        return new PipelineServiceClientResponse().withCode(Integer.valueOf(200)).withReason("Success").withPlatform(this.getPlatform());
    }

    private PipelineServiceClientResponse sendPost(String endpoint, Object request) {
        HttpResponse<String> response;
        String workflowPayload = JsonUtils.pojoToJson(request);
        try {
            String automationsEndpoint = "%s/%s/%s";
            String automationsUrl = String.format(automationsEndpoint, this.serviceURL, API_ENDPOINT, endpoint);
            response = this.post(automationsUrl, workflowPayload);
            if (response.statusCode() == 200) {
                return new PipelineServiceClientResponse().withCode(Integer.valueOf(200)).withReason(response.body()).withPlatform(this.getPlatform());
            }
        }
        catch (Exception e) {
            throw IngestionPipelineDeploymentException.byMessage(workflowPayload, e.getMessage());
        }
        throw new PipelineServiceClientException(String.format("%s Failed to trigger flow due to airflow API returned %s and response %s", workflowPayload, Response.Status.fromStatusCode((int)response.statusCode()), response.body()));
    }

    public PipelineServiceClientResponse killIngestion(IngestionPipeline ingestionPipeline) {
        HttpResponse<String> response;
        try {
            String killEndPoint = "%s/%s/kill";
            String killUrl = String.format(killEndPoint, this.serviceURL, API_ENDPOINT);
            JSONObject requestPayload = new JSONObject();
            requestPayload.put(DAG_ID, (Object)ingestionPipeline.getName());
            response = this.post(killUrl, requestPayload.toString());
            if (response.statusCode() == 200) {
                return new PipelineServiceClientResponse().withCode(Integer.valueOf(200)).withReason(response.body()).withPlatform(this.getPlatform());
            }
        }
        catch (Exception e) {
            throw PipelineServiceClientException.byMessage((String)"Failed to kill running workflows", (String)e.getMessage());
        }
        throw new PipelineServiceClientException(String.format("Failed to kill running workflows due to %s", response.body()));
    }

    public Map<String, String> requestGetHostIp() {
        HttpResponse<String> response;
        try {
            response = this.getRequestAuthenticatedForJsonContent("%s/%s/ip", this.serviceURL, API_ENDPOINT);
            if (response.statusCode() == 200) {
                return JsonUtils.readValue(response.body(), new TypeReference<Map<String, String>>(){});
            }
        }
        catch (Exception e) {
            throw PipelineServiceClientException.byMessage((String)"Failed to get Pipeline Service host IP.", (String)e.getMessage());
        }
        throw new PipelineServiceClientException(String.format("Failed to get Pipeline Service host IP due to %s", response.body()));
    }

    public Map<String, String> getLastIngestionLogs(IngestionPipeline ingestionPipeline, String after) {
        HttpResponse<String> response;
        String taskId = (String)TYPE_TO_TASK.get(ingestionPipeline.getPipelineType().toString());
        String afterParam = "";
        if (after != null) {
            afterParam = String.format("&after=%s", after);
        }
        try {
            response = this.getRequestAuthenticatedForJsonContent("%s/%s/last_dag_logs?dag_id=%s&task_id=%s%s", this.serviceURL, API_ENDPOINT, ingestionPipeline.getName(), taskId, afterParam);
            if (response.statusCode() == 200) {
                return JsonUtils.readValue(response.body(), new TypeReference<Map<String, String>>(){});
            }
        }
        catch (Exception e) {
            throw PipelineServiceClientException.byMessage((String)"Failed to get last ingestion logs.", (String)e.getMessage());
        }
        throw new PipelineServiceClientException(String.format("Failed to get last ingestion logs due to %s", response.body()));
    }

    private HttpResponse<String> getRequestAuthenticatedForJsonContent(String stringUrlFormat, Object ... stringReplacement) throws IOException, InterruptedException {
        HttpRequest request = this.authenticatedRequestBuilder(stringUrlFormat, stringReplacement).GET().build();
        return this.client.send(request, HttpResponse.BodyHandlers.ofString());
    }

    private HttpResponse<String> deleteRequestAuthenticatedForJsonContent(String stringUrlFormat, Object ... stringReplacement) throws IOException, InterruptedException {
        HttpRequest request = this.authenticatedRequestBuilder(stringUrlFormat, stringReplacement).DELETE().build();
        return this.client.send(request, HttpResponse.BodyHandlers.ofString());
    }

    private HttpRequest.Builder authenticatedRequestBuilder(String stringUrlFormat, Object ... stringReplacement) {
        String url = String.format(stringUrlFormat, stringReplacement);
        return HttpRequest.newBuilder(URI.create(url)).header("Content-Type", "application/json").header("Authorization", this.getBasicAuthenticationHeader(this.username, this.password));
    }
}

