/*
 * Decompiled with CFR 0.152.
 */
package org.openmetadata.service.monitoring;

import java.time.Instant;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.logging.log4j.util.Strings;
import org.openmetadata.schema.entity.services.ingestionPipelines.IngestionPipeline;
import org.openmetadata.schema.entity.services.ingestionPipelines.PipelineStatus;
import org.openmetadata.schema.entity.services.ingestionPipelines.PipelineStatusType;
import org.openmetadata.schema.monitoring.EventMonitorProvider;
import org.openmetadata.schema.type.ChangeDescription;
import org.openmetadata.schema.type.ChangeEvent;
import org.openmetadata.service.monitoring.EventMonitor;
import org.openmetadata.service.monitoring.EventMonitorConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.AwsCredentials;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.cloudwatch.CloudWatchClient;
import software.amazon.awssdk.services.cloudwatch.CloudWatchClientBuilder;
import software.amazon.awssdk.services.cloudwatch.model.CloudWatchException;
import software.amazon.awssdk.services.cloudwatch.model.Dimension;
import software.amazon.awssdk.services.cloudwatch.model.MetricDatum;
import software.amazon.awssdk.services.cloudwatch.model.PutMetricDataRequest;
import software.amazon.awssdk.services.cloudwatch.model.StandardUnit;

public class CloudwatchEventMonitor
extends EventMonitor {
    private static final Logger LOG = LoggerFactory.getLogger(CloudwatchEventMonitor.class);
    public static final String ACCESS_KEY_ID = "accessKeyId";
    public static final String SECRET_ACCESS_KEY = "secretAccessKey";
    public static final String REGION = "region";
    public static final String INGESTION_PIPELINE_CREATED = "INGESTION_PIPELINE_CREATED";
    public static final String INGESTION_PIPELINE_UPDATED = "INGESTION_PIPELINE_";
    public static final String INGESTION_PIPELINE_DELETED = "INGESTION_PIPELINE_DELETED";
    public static final String NAMESPACE = "INGESTION_PIPELINE";
    public static final String PIPELINE_STATUS = "pipelineStatus";
    private final CloudWatchClient client;

    public CloudwatchEventMonitor(EventMonitorProvider eventMonitorProvider, EventMonitorConfiguration config, String clusterPrefix) {
        super(eventMonitorProvider, config, clusterPrefix);
        if (config != null && config.getParameters() != null && !Strings.isBlank((String)config.getParameters().getOrDefault(REGION, ""))) {
            String region = config.getParameters().getOrDefault(REGION, "");
            String accessKeyId = config.getParameters().getOrDefault(ACCESS_KEY_ID, "");
            String secretAccessKey = config.getParameters().getOrDefault(SECRET_ACCESS_KEY, "");
            Object credentialsProvider = Strings.isBlank((String)accessKeyId) && Strings.isBlank((String)secretAccessKey) ? DefaultCredentialsProvider.create() : StaticCredentialsProvider.create((AwsCredentials)AwsBasicCredentials.create((String)accessKeyId, (String)secretAccessKey));
            this.client = (CloudWatchClient)((CloudWatchClientBuilder)((CloudWatchClientBuilder)CloudWatchClient.builder().region(Region.of((String)region))).credentialsProvider((AwsCredentialsProvider)credentialsProvider)).build();
        } else {
            this.client = CloudWatchClient.create();
        }
    }

    @Override
    protected void pushMetric(ChangeEvent event) {
        List<PutMetricDataRequest> requests = this.buildMetricRequest(event);
        requests.forEach(arg_0 -> ((CloudWatchClient)this.client).putMetricData(arg_0));
    }

    protected List<PutMetricDataRequest> buildMetricRequest(ChangeEvent event) {
        String fqn = event.getEntityFullyQualifiedName();
        IngestionPipeline ingestionPipeline = (IngestionPipeline)event.getEntity();
        String pipelineType = ingestionPipeline.getPipelineType().toString();
        Long timestamp = event.getTimestamp();
        List<PutMetricDataRequest> metricRequests = Collections.emptyList();
        try {
            switch (event.getEventType()) {
                case ENTITY_CREATED: {
                    metricRequests = List.of(this.logPipelineCreated(fqn, pipelineType, timestamp));
                    break;
                }
                case ENTITY_UPDATED: {
                    metricRequests = this.logPipelineUpdated(fqn, pipelineType, timestamp, event.getChangeDescription());
                    break;
                }
                case ENTITY_DELETED: 
                case ENTITY_SOFT_DELETED: {
                    metricRequests = List.of(this.logPipelineDeleted(fqn, pipelineType, timestamp));
                    break;
                }
                default: {
                    throw new IllegalArgumentException("Invalid EventType " + event.getEventType());
                }
            }
        }
        catch (IllegalArgumentException | CloudWatchException e) {
            LOG.error("Failed to publish IngestionPipeline Cloudwatch metric due to " + e.getMessage());
        }
        return metricRequests;
    }

    protected PutMetricDataRequest logPipelineCreated(String fqn, String pipelineType, Long timestamp) {
        return this.logPipelineStatus(fqn, pipelineType, timestamp, INGESTION_PIPELINE_CREATED);
    }

    protected PutMetricDataRequest logPipelineDeleted(String fqn, String pipelineType, Long timestamp) {
        return this.logPipelineStatus(fqn, pipelineType, timestamp, INGESTION_PIPELINE_DELETED);
    }

    protected List<PutMetricDataRequest> logPipelineUpdated(String fqn, String pipelineType, Long timestamp, ChangeDescription changeDescription) {
        return changeDescription.getFieldsUpdated().stream().map(change -> {
            if (change.getName().equals(PIPELINE_STATUS) && change.getNewValue() != null) {
                PipelineStatus pipelineStatus = (PipelineStatus)change.getNewValue();
                return this.logPipelineStatus(fqn, pipelineType, timestamp, this.getMetricNameByStatus(pipelineStatus.getPipelineState()));
            }
            LOG.debug("Ignoring Ingestion Pipeline change type " + change.getName());
            return null;
        }).collect(Collectors.toList());
    }

    private String getMetricNameByStatus(PipelineStatusType statusType) {
        return INGESTION_PIPELINE_UPDATED + statusType.toString().toUpperCase();
    }

    protected PutMetricDataRequest logPipelineStatus(String fqn, String pipelineType, Long timestamp, String metricName) {
        Dimension dimension = (Dimension)Dimension.builder().name(pipelineType).value(fqn).build();
        Instant instant = Instant.ofEpochMilli(timestamp);
        MetricDatum datum = (MetricDatum)MetricDatum.builder().metricName(metricName).unit(StandardUnit.COUNT).value(Double.valueOf(1.0)).timestamp(instant).dimensions(new Dimension[]{dimension}).build();
        return (PutMetricDataRequest)PutMetricDataRequest.builder().namespace(this.buildMetricNamespace(NAMESPACE)).metricData(new MetricDatum[]{datum}).build();
    }

    @Override
    protected void close() {
        this.client.close();
    }
}

