/*
 * Decompiled with CFR 0.152.
 */
package org.openmetadata.service.clients.pipeline.airflow;

import com.fasterxml.jackson.core.type.TypeReference;
import java.io.IOException;
import java.net.URI;
import java.net.URL;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import javax.ws.rs.core.Response;
import org.json.JSONObject;
import org.openmetadata.schema.ServiceEntityInterface;
import org.openmetadata.schema.api.configuration.pipelineServiceClient.PipelineServiceClientConfiguration;
import org.openmetadata.schema.entity.automations.Workflow;
import org.openmetadata.schema.entity.services.ingestionPipelines.IngestionPipeline;
import org.openmetadata.schema.entity.services.ingestionPipelines.PipelineStatus;
import org.openmetadata.sdk.PipelineServiceClient;
import org.openmetadata.sdk.exception.PipelineServiceClientException;
import org.openmetadata.service.exception.IngestionPipelineDeploymentException;
import org.openmetadata.service.util.JsonUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AirflowRESTClient
extends PipelineServiceClient {
    private static final Logger LOG = LoggerFactory.getLogger(AirflowRESTClient.class);
    private static final String USERNAME_KEY = "username";
    private static final String PASSWORD_KEY = "password";
    private static final String TIMEOUT_KEY = "timeout";
    protected final String username;
    protected final String password;
    protected final HttpClient client;
    protected final URL serviceURL;
    private static final String API_ENDPOINT = "api/v1/openmetadata";
    private static final String DAG_ID = "dag_id";

    public AirflowRESTClient(PipelineServiceClientConfiguration config) {
        super(config);
        this.username = (String)config.getParameters().getAdditionalProperties().get(USERNAME_KEY);
        this.password = (String)config.getParameters().getAdditionalProperties().get(PASSWORD_KEY);
        this.serviceURL = this.validateServiceURL(config.getApiEndpoint());
        this.client = HttpClient.newBuilder().version(HttpClient.Version.HTTP_1_1).connectTimeout(Duration.ofSeconds(((Integer)config.getParameters().getAdditionalProperties().get(TIMEOUT_KEY)).intValue())).build();
    }

    public final HttpResponse<String> post(String endpoint, String payload, boolean authenticate) throws IOException, InterruptedException {
        HttpRequest.Builder requestBuilder = HttpRequest.newBuilder(URI.create(endpoint)).header("Content-Type", "application/json").POST(HttpRequest.BodyPublishers.ofString(payload));
        if (authenticate) {
            requestBuilder.header("Authorization", this.getBasicAuthenticationHeader(this.username, this.password));
        }
        return this.client.send(requestBuilder.build(), HttpResponse.BodyHandlers.ofString());
    }

    public final HttpResponse<String> post(String endpoint, String payload) throws IOException, InterruptedException {
        return this.post(endpoint, payload, true);
    }

    public String deployPipeline(IngestionPipeline ingestionPipeline, ServiceEntityInterface service) {
        HttpResponse<String> response;
        try {
            String deployEndpoint = "%s/%s/deploy";
            String deployUrl = String.format(deployEndpoint, this.serviceURL, API_ENDPOINT);
            String pipelinePayload = JsonUtils.pojoToJson(ingestionPipeline);
            response = this.post(deployUrl, pipelinePayload);
            if (response.statusCode() == 200) {
                ingestionPipeline.setDeployed(Boolean.valueOf(true));
                return response.body();
            }
        }
        catch (Exception e) {
            throw IngestionPipelineDeploymentException.byMessage(ingestionPipeline.getName(), e.getMessage());
        }
        throw new PipelineServiceClientException(String.format("%s Failed to deploy Ingestion Pipeline due to airflow API returned %s and response %s", ingestionPipeline.getName(), Response.Status.fromStatusCode((int)response.statusCode()), response.body()));
    }

    public String deletePipeline(IngestionPipeline ingestionPipeline) {
        String pipelineName = ingestionPipeline.getName();
        try {
            String deleteEndpoint = "%s/%s/delete?dag_id=%s";
            HttpResponse<String> response = this.deleteRequestAuthenticatedForJsonContent(deleteEndpoint, this.serviceURL, API_ENDPOINT, pipelineName);
            return response.body();
        }
        catch (Exception e) {
            LOG.error(String.format("Failed to delete Airflow Pipeline %s from Airflow DAGS", pipelineName));
            return null;
        }
    }

    public String runPipeline(IngestionPipeline ingestionPipeline, ServiceEntityInterface service) {
        HttpResponse<String> response;
        String pipelineName = ingestionPipeline.getName();
        try {
            String triggerEndPoint = "%s/%s/trigger";
            String triggerUrl = String.format(triggerEndPoint, this.serviceURL, API_ENDPOINT);
            JSONObject requestPayload = new JSONObject();
            requestPayload.put(DAG_ID, (Object)pipelineName);
            response = this.post(triggerUrl, requestPayload.toString());
            if (response.statusCode() == 200) {
                return response.body();
            }
        }
        catch (Exception e) {
            throw IngestionPipelineDeploymentException.byMessage(pipelineName, e.getMessage());
        }
        throw IngestionPipelineDeploymentException.byMessage(pipelineName, "Failed to trigger IngestionPipeline", Response.Status.fromStatusCode((int)response.statusCode()));
    }

    public IngestionPipeline toggleIngestion(IngestionPipeline ingestionPipeline) {
        HttpResponse<String> response;
        try {
            JSONObject requestPayload = new JSONObject();
            requestPayload.put(DAG_ID, (Object)ingestionPipeline.getName());
            if (ingestionPipeline.getEnabled().equals(Boolean.TRUE)) {
                String toggleEndPoint = "%s/%s/disable";
                String toggleUrl = String.format(toggleEndPoint, this.serviceURL, API_ENDPOINT);
                response = this.post(toggleUrl, requestPayload.toString());
                if (response.statusCode() == 200) {
                    ingestionPipeline.setEnabled(Boolean.valueOf(false));
                    return ingestionPipeline;
                }
                if (response.statusCode() == 404) {
                    ingestionPipeline.setDeployed(Boolean.valueOf(false));
                    return ingestionPipeline;
                }
            } else {
                String toggleEndPoint = "%s/%s/enable";
                String toggleUrl = String.format(toggleEndPoint, this.serviceURL, API_ENDPOINT);
                response = this.post(toggleUrl, requestPayload.toString());
                if (response.statusCode() == 200) {
                    ingestionPipeline.setEnabled(Boolean.valueOf(true));
                    return ingestionPipeline;
                }
                if (response.statusCode() == 404) {
                    ingestionPipeline.setDeployed(Boolean.valueOf(false));
                    return ingestionPipeline;
                }
            }
        }
        catch (Exception e) {
            throw PipelineServiceClientException.byMessage((String)ingestionPipeline.getName(), (String)e.getMessage());
        }
        throw PipelineServiceClientException.byMessage((String)ingestionPipeline.getName(), (String)"Failed to toggle ingestion pipeline state", (Response.Status)Response.Status.fromStatusCode((int)response.statusCode()));
    }

    public List<PipelineStatus> getQueuedPipelineStatus(IngestionPipeline ingestionPipeline) {
        HttpResponse<String> response;
        try {
            String statusEndPoint = "%s/%s/status?dag_id=%s&only_queued=true";
            response = this.getRequestAuthenticatedForJsonContent(statusEndPoint, this.serviceURL, API_ENDPOINT, ingestionPipeline.getName());
            if (response.statusCode() == 200) {
                return JsonUtils.readObjects(response.body(), PipelineStatus.class);
            }
        }
        catch (Exception e) {
            throw PipelineServiceClientException.byMessage((String)ingestionPipeline.getName(), (String)e.getMessage());
        }
        throw PipelineServiceClientException.byMessage((String)ingestionPipeline.getName(), (String)"Failed to fetch ingestion pipeline runs", (Response.Status)Response.Status.fromStatusCode((int)response.statusCode()));
    }

    public Response getServiceStatus() {
        HttpResponse<String> response;
        try {
            response = this.getRequestNoAuthForJsonContent(this.serviceURL, API_ENDPOINT);
            if (response.statusCode() == 200) {
                JSONObject responseJSON = new JSONObject(response.body());
                String ingestionVersion = responseJSON.getString("version");
                if (Boolean.TRUE.equals(this.validServerClientVersions(ingestionVersion))) {
                    Map<String, String> status = Map.of("status", "healthy");
                    return Response.status((int)200, (String)status.toString()).build();
                }
                Map<String, String> status = Map.of("status", "unhealthy", "reason", String.format("Got Ingestion Version %s and Server Version %s. They should match.", ingestionVersion, SERVER_VERSION));
                return Response.status((int)500, (String)status.toString()).build();
            }
        }
        catch (Exception e) {
            throw PipelineServiceClientException.byMessage((String)"Failed to get REST status.", (String)e.getMessage());
        }
        throw new PipelineServiceClientException(String.format("Failed to get REST status due to %s.", response.body()));
    }

    public Response runAutomationsWorkflow(Workflow workflow) {
        HttpResponse<String> response;
        try {
            String automationsEndpoint = "%s/%s/run_automation";
            String automationsUrl = String.format(automationsEndpoint, this.serviceURL, API_ENDPOINT);
            String workflowPayload = JsonUtils.pojoToJson(workflow);
            response = this.post(automationsUrl, workflowPayload);
            if (response.statusCode() == 200) {
                return Response.status((int)200, (String)response.body()).build();
            }
        }
        catch (Exception e) {
            throw IngestionPipelineDeploymentException.byMessage(workflow.getName(), e.getMessage());
        }
        throw new PipelineServiceClientException(String.format("%s Failed to trigger workflow due to airflow API returned %s and response %s", workflow.getName(), Response.Status.fromStatusCode((int)response.statusCode()), response.body()));
    }

    public Response killIngestion(IngestionPipeline ingestionPipeline) {
        HttpResponse<String> response;
        try {
            String killEndPoint = "%s/%s/kill";
            String killUrl = String.format(killEndPoint, this.serviceURL, API_ENDPOINT);
            JSONObject requestPayload = new JSONObject();
            requestPayload.put(DAG_ID, (Object)ingestionPipeline.getName());
            response = this.post(killUrl, requestPayload.toString());
            if (response.statusCode() == 200) {
                return Response.status((int)200, (String)response.body()).build();
            }
        }
        catch (Exception e) {
            throw PipelineServiceClientException.byMessage((String)"Failed to kill running workflows", (String)e.getMessage());
        }
        throw new PipelineServiceClientException(String.format("Failed to kill running workflows due to %s", response.body()));
    }

    public Map<String, String> requestGetHostIp() {
        HttpResponse<String> response;
        try {
            response = this.getRequestAuthenticatedForJsonContent("%s/%s/ip", this.serviceURL, API_ENDPOINT);
            if (response.statusCode() == 200) {
                return JsonUtils.readValue(response.body(), new TypeReference<Map<String, String>>(){});
            }
        }
        catch (Exception e) {
            throw PipelineServiceClientException.byMessage((String)"Failed to get Pipeline Service host IP.", (String)e.getMessage());
        }
        throw new PipelineServiceClientException(String.format("Failed to get Pipeline Service host IP due to %s", response.body()));
    }

    public Map<String, String> getLastIngestionLogs(IngestionPipeline ingestionPipeline, String after) {
        HttpResponse<String> response;
        String taskId = (String)TYPE_TO_TASK.get(ingestionPipeline.getPipelineType().toString());
        String afterParam = "";
        if (after != null) {
            afterParam = String.format("&after=%s", after);
        }
        try {
            response = this.getRequestAuthenticatedForJsonContent("%s/%s/last_dag_logs?dag_id=%s&task_id=%s%s", this.serviceURL, API_ENDPOINT, ingestionPipeline.getName(), taskId, afterParam);
            if (response.statusCode() == 200) {
                return JsonUtils.readValue(response.body(), new TypeReference<Map<String, String>>(){});
            }
        }
        catch (Exception e) {
            throw PipelineServiceClientException.byMessage((String)"Failed to get last ingestion logs.", (String)e.getMessage());
        }
        throw new PipelineServiceClientException(String.format("Failed to get last ingestion logs due to %s", response.body()));
    }

    private HttpResponse<String> getRequestAuthenticatedForJsonContent(String stringUrlFormat, Object ... stringReplacement) throws IOException, InterruptedException {
        HttpRequest request = this.authenticatedRequestBuilder(stringUrlFormat, stringReplacement).GET().build();
        return this.client.send(request, HttpResponse.BodyHandlers.ofString());
    }

    private HttpResponse<String> deleteRequestAuthenticatedForJsonContent(String stringUrlFormat, Object ... stringReplacement) throws IOException, InterruptedException {
        HttpRequest request = this.authenticatedRequestBuilder(stringUrlFormat, stringReplacement).DELETE().build();
        return this.client.send(request, HttpResponse.BodyHandlers.ofString());
    }

    private HttpRequest.Builder authenticatedRequestBuilder(String stringUrlFormat, Object ... stringReplacement) {
        String url = String.format(stringUrlFormat, stringReplacement);
        return HttpRequest.newBuilder(URI.create(url)).header("Content-Type", "application/json").header("Authorization", this.getBasicAuthenticationHeader(this.username, this.password));
    }

    private HttpResponse<String> getRequestNoAuthForJsonContent(Object ... stringReplacement) throws IOException, InterruptedException {
        String url = String.format("%s/%s/health", stringReplacement);
        HttpRequest request = HttpRequest.newBuilder(URI.create(url)).header("Content-Type", "application/json").GET().build();
        return this.client.send(request, HttpResponse.BodyHandlers.ofString());
    }
}

