/*
 * Decompiled with CFR 0.152.
 */
package org.openmetadata.service.airflow;

import com.fasterxml.jackson.core.type.TypeReference;
import java.io.IOException;
import java.net.URI;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.util.List;
import java.util.Map;
import javax.ws.rs.core.Response;
import org.json.JSONObject;
import org.openmetadata.schema.api.configuration.airflow.AirflowConfiguration;
import org.openmetadata.schema.api.services.ingestionPipelines.TestServiceConnection;
import org.openmetadata.schema.entity.services.ingestionPipelines.IngestionPipeline;
import org.openmetadata.schema.entity.services.ingestionPipelines.PipelineStatus;
import org.openmetadata.service.exception.IngestionPipelineDeploymentException;
import org.openmetadata.service.exception.PipelineServiceClientException;
import org.openmetadata.service.util.JsonUtils;
import org.openmetadata.service.util.PipelineServiceClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AirflowRESTClient
extends PipelineServiceClient {
    private static final Logger LOG = LoggerFactory.getLogger(AirflowRESTClient.class);
    private static final String API_ENDPOINT = "api/v1/openmetadata";
    private static final String DAG_ID = "dag_id";

    public AirflowRESTClient(AirflowConfiguration airflowConfig) {
        super(airflowConfig.getUsername(), airflowConfig.getPassword(), airflowConfig.getApiEndpoint(), airflowConfig.getHostIp(), airflowConfig.getTimeout());
    }

    @Override
    public String deployPipeline(IngestionPipeline ingestionPipeline) {
        HttpResponse<String> response;
        try {
            String deployEndpoint = "%s/%s/deploy";
            String deployUrl = String.format(deployEndpoint, this.serviceURL, API_ENDPOINT);
            String pipelinePayload = JsonUtils.pojoToJson(ingestionPipeline);
            response = this.post(deployUrl, pipelinePayload);
            if (response.statusCode() == 200) {
                return response.body();
            }
        }
        catch (Exception e) {
            throw IngestionPipelineDeploymentException.byMessage(ingestionPipeline.getName(), e.getMessage());
        }
        throw new PipelineServiceClientException(String.format("%s Failed to deploy Ingestion Pipeline due to airflow API returned %s and response %s", ingestionPipeline.getName(), Response.Status.fromStatusCode((int)response.statusCode()), response.body()));
    }

    @Override
    public String deletePipeline(String pipelineName) {
        try {
            String deleteEndpoint = "%s/%s/delete?dag_id=%s";
            HttpResponse<String> response = this.deleteRequestAuthenticatedForJsonContent(deleteEndpoint, this.serviceURL, API_ENDPOINT, pipelineName);
            return response.body();
        }
        catch (Exception e) {
            LOG.error(String.format("Failed to delete Airflow Pipeline %s from Airflow DAGS", pipelineName));
            return null;
        }
    }

    @Override
    public String runPipeline(String pipelineName) {
        HttpResponse<String> response;
        try {
            String triggerEndPoint = "%s/%s/trigger";
            String triggerUrl = String.format(triggerEndPoint, this.serviceURL, API_ENDPOINT);
            JSONObject requestPayload = new JSONObject();
            requestPayload.put(DAG_ID, (Object)pipelineName);
            response = this.post(triggerUrl, requestPayload.toString());
            if (response.statusCode() == 200) {
                return response.body();
            }
        }
        catch (Exception e) {
            throw IngestionPipelineDeploymentException.byMessage(pipelineName, e.getMessage());
        }
        throw IngestionPipelineDeploymentException.byMessage(pipelineName, "Failed to trigger IngestionPipeline", Response.Status.fromStatusCode((int)response.statusCode()));
    }

    @Override
    public IngestionPipeline toggleIngestion(IngestionPipeline ingestionPipeline) {
        HttpResponse<String> response;
        try {
            JSONObject requestPayload = new JSONObject();
            requestPayload.put(DAG_ID, (Object)ingestionPipeline.getName());
            if (ingestionPipeline.getEnabled().equals(Boolean.TRUE)) {
                String toggleEndPoint = "%s/%s/disable";
                String toggleUrl = String.format(toggleEndPoint, this.serviceURL, API_ENDPOINT);
                response = this.post(toggleUrl, requestPayload.toString());
                if (response.statusCode() == 200) {
                    ingestionPipeline.setEnabled(Boolean.valueOf(false));
                    return ingestionPipeline;
                }
                if (response.statusCode() == 404) {
                    ingestionPipeline.setDeployed(Boolean.valueOf(false));
                    return ingestionPipeline;
                }
            } else {
                String toggleEndPoint = "%s/%s/enable";
                String toggleUrl = String.format(toggleEndPoint, this.serviceURL, API_ENDPOINT);
                response = this.post(toggleUrl, requestPayload.toString());
                if (response.statusCode() == 200) {
                    ingestionPipeline.setEnabled(Boolean.valueOf(true));
                    return ingestionPipeline;
                }
                if (response.statusCode() == 404) {
                    ingestionPipeline.setDeployed(Boolean.valueOf(false));
                    return ingestionPipeline;
                }
            }
        }
        catch (Exception e) {
            throw PipelineServiceClientException.byMessage(ingestionPipeline.getName(), e.getMessage());
        }
        throw PipelineServiceClientException.byMessage(ingestionPipeline.getName(), "Failed to toggle ingestion pipeline state", Response.Status.fromStatusCode((int)response.statusCode()));
    }

    @Override
    public IngestionPipeline getPipelineStatus(IngestionPipeline ingestionPipeline) {
        HttpResponse<String> response;
        try {
            String statusEndPoint = "%s/%s/status?dag_id=%s";
            response = this.getRequestAuthenticatedForJsonContent(statusEndPoint, this.serviceURL, API_ENDPOINT, ingestionPipeline.getName());
            if (response.statusCode() == 200) {
                List<PipelineStatus> statuses = JsonUtils.readObjects(response.body(), PipelineStatus.class);
                ingestionPipeline.setPipelineStatuses(statuses);
                ingestionPipeline.setDeployed(Boolean.valueOf(true));
                return ingestionPipeline;
            }
            if (response.statusCode() == 404) {
                ingestionPipeline.setDeployed(Boolean.valueOf(false));
            }
        }
        catch (Exception e) {
            throw PipelineServiceClientException.byMessage(ingestionPipeline.getName(), e.getMessage());
        }
        throw PipelineServiceClientException.byMessage(ingestionPipeline.getName(), "Failed to fetch ingestion pipeline runs", Response.Status.fromStatusCode((int)response.statusCode()));
    }

    @Override
    public Response getServiceStatus() {
        HttpResponse<String> response;
        try {
            response = this.getRequestNoAuthForJsonContent("%s/%s/health", this.serviceURL, API_ENDPOINT);
            if (response.statusCode() == 200) {
                JSONObject responseJSON = new JSONObject(response.body());
                String ingestionVersion = responseJSON.getString("version");
                if (Boolean.TRUE.equals(this.validServerClientVersions(ingestionVersion))) {
                    Map<String, String> status = Map.of("status", "healthy");
                    return Response.status((int)200, (String)status.toString()).build();
                }
                Map<String, String> status = Map.of("status", "unhealthy", "reason", String.format("Got Ingestion Version %s and Server Version %s. They should match.", ingestionVersion, SERVER_VERSION));
                return Response.status((int)500, (String)status.toString()).build();
            }
        }
        catch (Exception e) {
            throw PipelineServiceClientException.byMessage("Failed to get REST status.", e.getMessage());
        }
        throw new PipelineServiceClientException(String.format("Failed to get REST status due to %s.", response.body()));
    }

    @Override
    public HttpResponse<String> testConnection(TestServiceConnection testServiceConnection) {
        HttpResponse<String> response;
        try {
            String statusEndPoint = "%s/%s/test_connection";
            String statusUrl = String.format(statusEndPoint, this.serviceURL, API_ENDPOINT);
            String connectionPayload = JsonUtils.pojoToJson(testServiceConnection);
            response = this.post(statusUrl, connectionPayload);
            if (response.statusCode() == 200) {
                return response;
            }
        }
        catch (Exception e) {
            throw PipelineServiceClientException.byMessage("Failed to test connection.", e.getMessage());
        }
        throw new PipelineServiceClientException(String.format("Failed to test connection due to %s", response.body()));
    }

    @Override
    public HttpResponse<String> killIngestion(IngestionPipeline ingestionPipeline) {
        HttpResponse<String> response;
        try {
            String killEndPoint = "%s/%s/kill";
            String killUrl = String.format(killEndPoint, this.serviceURL, API_ENDPOINT);
            JSONObject requestPayload = new JSONObject();
            requestPayload.put(DAG_ID, (Object)ingestionPipeline.getName());
            response = this.post(killUrl, requestPayload.toString());
            if (response.statusCode() == 200) {
                return response;
            }
        }
        catch (Exception e) {
            throw PipelineServiceClientException.byMessage("Failed to kill running workflows", e.getMessage());
        }
        throw new PipelineServiceClientException(String.format("Failed to kill running workflows due to %s", response.body()));
    }

    @Override
    public Map<String, String> requestGetHostIp() {
        HttpResponse<String> response;
        try {
            response = this.getRequestAuthenticatedForJsonContent("%s/%s/ip", this.serviceURL, API_ENDPOINT);
            if (response.statusCode() == 200) {
                return JsonUtils.readValue(response.body(), new TypeReference<Map<String, String>>(){});
            }
        }
        catch (Exception e) {
            throw PipelineServiceClientException.byMessage("Failed to get Pipeline Service host IP.", e.getMessage());
        }
        throw new PipelineServiceClientException(String.format("Failed to get Pipeline Service host IP due to %s", response.body()));
    }

    @Override
    public Map<String, String> getLastIngestionLogs(IngestionPipeline ingestionPipeline) {
        HttpResponse<String> response;
        try {
            response = this.getRequestAuthenticatedForJsonContent("%s/%s/last_dag_logs?dag_id=%s", this.serviceURL, API_ENDPOINT, ingestionPipeline.getName());
            if (response.statusCode() == 200) {
                return JsonUtils.readValue(response.body(), new TypeReference<Map<String, String>>(){});
            }
        }
        catch (Exception e) {
            throw PipelineServiceClientException.byMessage("Failed to get last ingestion logs.", e.getMessage());
        }
        throw new PipelineServiceClientException(String.format("Failed to get last ingestion logs due to %s", response.body()));
    }

    private HttpResponse<String> getRequestAuthenticatedForJsonContent(String stringUrlFormat, Object ... stringReplacement) throws IOException, InterruptedException {
        HttpRequest request = this.authenticatedRequestBuilder(stringUrlFormat, stringReplacement).GET().build();
        return this.client.send(request, HttpResponse.BodyHandlers.ofString());
    }

    private HttpResponse<String> deleteRequestAuthenticatedForJsonContent(String stringUrlFormat, Object ... stringReplacement) throws IOException, InterruptedException {
        HttpRequest request = this.authenticatedRequestBuilder(stringUrlFormat, stringReplacement).DELETE().build();
        return this.client.send(request, HttpResponse.BodyHandlers.ofString());
    }

    private HttpRequest.Builder authenticatedRequestBuilder(String stringUrlFormat, Object ... stringReplacement) {
        String url = String.format(stringUrlFormat, stringReplacement);
        return HttpRequest.newBuilder(URI.create(url)).header("Content-Type", "application/json").header("Authorization", this.getBasicAuthenticationHeader(this.username, this.password));
    }

    private HttpResponse<String> getRequestNoAuthForJsonContent(String stringUrlFormat, Object ... stringReplacement) throws IOException, InterruptedException {
        String url = String.format(stringUrlFormat, stringReplacement);
        HttpRequest request = HttpRequest.newBuilder(URI.create(url)).header("Content-Type", "application/json").GET().build();
        return this.client.send(request, HttpResponse.BodyHandlers.ofString());
    }
}

