/*
 * Decompiled with CFR 0.152.
 */
package org.openmetadata.service.search.opensearch;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.glassfish.jersey.internal.util.ExceptionUtils;
import org.openmetadata.schema.system.EntityError;
import org.openmetadata.schema.system.IndexingError;
import org.openmetadata.schema.system.StepStats;
import org.openmetadata.service.exception.SearchIndexException;
import org.openmetadata.service.search.SearchRepository;
import org.openmetadata.service.util.JsonUtils;
import org.openmetadata.service.workflows.interfaces.Sink;
import org.openmetadata.service.workflows.searchIndex.ReindexingUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import os.org.opensearch.action.DocWriteRequest;
import os.org.opensearch.action.bulk.BulkRequest;
import os.org.opensearch.action.bulk.BulkResponse;
import os.org.opensearch.client.RequestOptions;

public class OpenSearchIndexSink
implements Sink<BulkRequest, BulkResponse> {
    private static final Logger LOG = LoggerFactory.getLogger(OpenSearchIndexSink.class);
    private final StepStats stats = new StepStats();
    private final SearchRepository searchRepository;
    private final int maxPayLoadSizeInBytes;

    public OpenSearchIndexSink(SearchRepository repository, int total, int maxPayLoadSizeInBytes) {
        this.searchRepository = repository;
        this.maxPayLoadSizeInBytes = maxPayLoadSizeInBytes;
        this.stats.withTotalRecords(Integer.valueOf(total)).withSuccessRecords(Integer.valueOf(0)).withFailedRecords(Integer.valueOf(0));
    }

    @Override
    public BulkResponse write(BulkRequest data, Map<String, Object> contextData) throws SearchIndexException {
        LOG.debug("[OsSearchIndexSink] Processing a Batch of Size: {}", (Object)data.numberOfActions());
        try {
            List entityNames = Optional.ofNullable(contextData.get("entityNameList")).orElse(Collections.emptyList());
            ArrayList<EntityError> entityErrorList = new ArrayList<EntityError>();
            BulkResponse response = null;
            BulkRequest bufferData = new BulkRequest();
            long requestIndex = 0L;
            for (DocWriteRequest request : data.requests()) {
                long requestSize = new BulkRequest().add(request).estimatedSizeInBytes();
                if (requestSize > (long)this.maxPayLoadSizeInBytes) {
                    entityErrorList.add(new EntityError().withMessage("Entity size exceeds elastic search maximum payload size").withEntity(entityNames.get(Math.toIntExact(requestIndex))));
                    ++requestIndex;
                    continue;
                }
                if (bufferData.estimatedSizeInBytes() + requestSize > (long)this.maxPayLoadSizeInBytes) {
                    response = this.searchRepository.getSearchClient().bulk(bufferData, RequestOptions.DEFAULT);
                    entityErrorList.addAll(ReindexingUtil.getErrorsFromBulkResponse(response));
                    bufferData = new BulkRequest();
                }
                bufferData.add(request);
                ++requestIndex;
            }
            if (!bufferData.requests().isEmpty()) {
                response = this.searchRepository.getSearchClient().bulk(bufferData, RequestOptions.DEFAULT);
                entityErrorList.addAll(ReindexingUtil.getErrorsFromBulkResponse(response));
            }
            LOG.debug("[OSSearchIndexSink] Batch Stats :- Submitted : {} Success: {} Failed: {}", new Object[]{data.numberOfActions(), data.numberOfActions() - entityErrorList.size(), entityErrorList.size()});
            this.updateStats(data.numberOfActions() - entityErrorList.size(), entityErrorList.size());
            if (!entityErrorList.isEmpty()) {
                throw new SearchIndexException(new IndexingError().withErrorSource(IndexingError.ErrorSource.SINK).withSubmittedCount(Integer.valueOf(data.numberOfActions())).withSuccessCount(Integer.valueOf(data.numberOfActions() - entityErrorList.size())).withFailedCount(Integer.valueOf(entityErrorList.size())).withMessage(String.format("Issues in Sink To Elastic Search: %s", entityErrorList)).withFailedEntities(entityErrorList));
            }
            return response;
        }
        catch (SearchIndexException ex) {
            this.updateStats(ex.getIndexingError().getSuccessCount(), ex.getIndexingError().getFailedCount());
            throw ex;
        }
        catch (Exception e) {
            IndexingError indexingError = new IndexingError().withErrorSource(IndexingError.ErrorSource.SINK).withSubmittedCount(Integer.valueOf(data.numberOfActions())).withSuccessCount(Integer.valueOf(0)).withFailedCount(Integer.valueOf(data.numberOfActions())).withMessage(String.format("Issue in Sink to Elastic Search: %s", e.getMessage())).withStackTrace(ExceptionUtils.exceptionStackTraceAsString((Throwable)e));
            LOG.debug("[OSSearchIndexSink] Failed, Details : {}", (Object)JsonUtils.pojoToJson(indexingError));
            this.updateStats(0, data.numberOfActions());
            throw new SearchIndexException(indexingError);
        }
    }

    @Override
    public void updateStats(int currentSuccess, int currentFailed) {
        ReindexingUtil.getUpdatedStats(this.stats, currentSuccess, currentFailed);
    }

    @Override
    public StepStats getStats() {
        return this.stats;
    }
}

