/*
 * Decompiled with CFR 0.152.
 */
package org.openmetadata.service.resources.elasticSearch;

import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.Date;
import java.util.UUID;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.openmetadata.schema.settings.EventPublisherJob;
import org.openmetadata.schema.settings.FailureDetails;
import org.openmetadata.schema.settings.Stats;
import org.openmetadata.service.jdbi3.CollectionDAO;
import org.openmetadata.service.socket.WebSocketManager;
import org.openmetadata.service.util.JsonUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BulkProcessorListener
implements BulkProcessor.Listener {
    private static final Logger LOG = LoggerFactory.getLogger(BulkProcessorListener.class);
    private volatile boolean updateTotalRequest = true;
    private volatile int totalSuccessCount = 0;
    private volatile int totalFailedCount = 0;
    private volatile int totalRequests = 0;
    private final CollectionDAO dao;
    private final UUID startedBy;

    public BulkProcessorListener(CollectionDAO dao, UUID startedBy) {
        this.dao = dao;
        this.startedBy = startedBy;
        this.resetCounters();
    }

    public void beforeBulk(long executionId, BulkRequest bulkRequest) {
        int numberOfActions = bulkRequest.numberOfActions();
        LOG.info("Executing bulk [{}] with {} requests", (Object)executionId, (Object)numberOfActions);
    }

    public void afterBulk(long executionId, BulkRequest bulkRequest, BulkResponse bulkResponse) {
        try {
            boolean batchHasFailures = false;
            int failedCount = 0;
            FailureDetails failureDetails = new FailureDetails();
            for (BulkItemResponse bulkItemResponse : bulkResponse) {
                if (!bulkItemResponse.isFailed()) continue;
                BulkItemResponse.Failure failure = bulkItemResponse.getFailure();
                failureDetails.setLastFailedReason(String.format("Index Type: [%s], Reason: [%s] \n Trace : [%s]", failure.getIndex(), failure.getMessage(), ExceptionUtils.getStackTrace((Throwable)failure.getCause())));
                failureDetails.setContext(String.format("Entities Info : \n ID : [%s] ", failure.getId()));
                ++failedCount;
                batchHasFailures = true;
            }
            this.updateFailedAndSuccess(failedCount, bulkResponse.getItems().length - failedCount);
            EventPublisherJob.Status status = batchHasFailures ? EventPublisherJob.Status.ACTIVEWITHERROR : EventPublisherJob.Status.ACTIVE;
            Stats stats = new Stats().withFailed(Integer.valueOf(this.totalFailedCount)).withSuccess(Integer.valueOf(this.totalSuccessCount)).withTotal(Integer.valueOf(this.totalRequests));
            FailureDetails hasFailureDetails = batchHasFailures ? failureDetails : null;
            this.updateElasticSearchStatus(status, hasFailureDetails, stats);
        }
        catch (RuntimeException e) {
            LOG.error("Error in processing Bulk");
        }
    }

    public void afterBulk(long executionId, BulkRequest bulkRequest, Throwable throwable) {
        LOG.error("Failed to execute bulk", throwable);
        this.updateFailedAndSuccess(bulkRequest.numberOfActions(), 0);
        EventPublisherJob.Status status = EventPublisherJob.Status.ACTIVEWITHERROR;
        Stats stats = new Stats().withFailed(Integer.valueOf(this.totalFailedCount)).withSuccess(Integer.valueOf(this.totalSuccessCount)).withTotal(Integer.valueOf(this.totalRequests));
        FailureDetails hasFailureDetails = new FailureDetails().withContext(String.format("Bulk Requests : [%s] ", bulkRequest.getDescription())).withLastFailedReason(String.format("Batch Failed Completely. \n Reason : [%s] \n Trace : [%s] ", throwable.getMessage(), ExceptionUtils.getStackTrace((Throwable)throwable)));
        this.updateElasticSearchStatus(status, hasFailureDetails, stats);
    }

    public synchronized void addRequests(int count) {
        if (this.updateTotalRequest) {
            this.totalRequests += count;
        }
        this.updateTotalRequest = false;
    }

    public synchronized void allowTotalRequestUpdate() {
        this.updateTotalRequest = true;
    }

    public synchronized void resetCounters() {
        this.totalRequests = 0;
        this.totalFailedCount = 0;
        this.totalSuccessCount = 0;
        this.updateTotalRequest = true;
    }

    public synchronized void updateFailedAndSuccess(int failedCount, int successCount) {
        this.totalFailedCount += failedCount;
        this.totalSuccessCount += successCount;
    }

    public void updateElasticSearchStatus(EventPublisherJob.Status status, FailureDetails failDetails, Stats newStats) {
        try {
            long updateTime = Date.from(LocalDateTime.now().atZone(ZoneId.systemDefault()).toInstant()).getTime();
            String recordString = this.dao.entityExtensionTimeSeriesDao().getExtension("eventPublisher:ElasticSearch:BATCH", "service.eventPublisher");
            EventPublisherJob lastRecord = JsonUtils.readValue(recordString, EventPublisherJob.class);
            long originalLastUpdate = lastRecord.getTimestamp();
            if (this.totalRequests == this.totalFailedCount + this.totalSuccessCount) {
                lastRecord.setStatus(EventPublisherJob.Status.IDLE);
            } else {
                lastRecord.setStatus(status);
            }
            lastRecord.setTimestamp(Long.valueOf(updateTime));
            if (failDetails != null) {
                lastRecord.setFailureDetails(new FailureDetails().withContext(failDetails.getContext()).withLastFailedAt(Long.valueOf(updateTime)).withLastFailedReason(failDetails.getLastFailedReason()));
            }
            lastRecord.setStats(newStats);
            this.dao.entityExtensionTimeSeriesDao().update("eventPublisher:ElasticSearch:BATCH", "service.eventPublisher", JsonUtils.pojoToJson(lastRecord), originalLastUpdate);
            WebSocketManager.getInstance().sendToOne(this.startedBy, "jobStatus", JsonUtils.pojoToJson(lastRecord));
        }
        catch (Exception e) {
            LOG.error("Failed to Update Elastic Search Job Info");
        }
    }
}

