package org.elasticsearch.xpack.ml.datafeed.extractor.aggregation;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.search.SearchAction;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.search.aggregations.Aggregation;
import org.elasticsearch.search.aggregations.AggregationBuilder;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.PipelineAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.histogram.Histogram;
import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractor;
import org.elasticsearch.xpack.ml.datafeed.extractor.ExtractorUtils;

/* loaded from: input_file:x-pack-api-5.4.3.jar:org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationDataExtractor.class */
class AggregationDataExtractor implements DataExtractor {
    private static final Logger LOGGER = Loggers.getLogger((Class<?>) AggregationDataExtractor.class);
    private static int BATCH_KEY_VALUE_PAIRS = 1000;
    private final Client client;
    private final AggregationDataExtractorContext context;
    private boolean hasNext = true;
    private boolean isCancelled = false;
    private LinkedList<Histogram.Bucket> histogramBuckets = null;

    /* JADX INFO: Access modifiers changed from: package-private */
    public AggregationDataExtractor(Client client, AggregationDataExtractorContext aggregationDataExtractorContext) {
        this.client = (Client) Objects.requireNonNull(client);
        this.context = (AggregationDataExtractorContext) Objects.requireNonNull(aggregationDataExtractorContext);
    }

    @Override // org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractor
    public boolean hasNext() {
        return this.hasNext;
    }

    @Override // org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractor
    public boolean isCancelled() {
        return this.isCancelled;
    }

    @Override // org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractor
    public void cancel() {
        LOGGER.trace("[{}] Data extractor received cancel request", this.context.jobId);
        this.isCancelled = true;
        this.hasNext = false;
    }

    @Override // org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractor
    public Optional<InputStream> next() throws IOException {
        if (!hasNext()) {
            throw new NoSuchElementException();
        }
        if (this.histogramBuckets == null) {
            this.histogramBuckets = search();
        }
        return Optional.ofNullable(processNextBatch());
    }

    private LinkedList<Histogram.Bucket> search() throws IOException {
        if (this.histogramBuckets != null) {
            throw new IllegalStateException("search should only be performed once");
        }
        LOGGER.debug("[{}] Executing aggregated search", this.context.jobId);
        SearchResponse executeSearchRequest = executeSearchRequest(buildSearchRequest());
        ExtractorUtils.checkSearchWasSuccessful(this.context.jobId, executeSearchRequest);
        return new LinkedList<>(getHistogramBuckets(executeSearchRequest.getAggregations()));
    }

    protected SearchResponse executeSearchRequest(SearchRequestBuilder searchRequestBuilder) {
        return searchRequestBuilder.get();
    }

    private SearchRequestBuilder buildSearchRequest() {
        SearchRequestBuilder query = SearchAction.INSTANCE.newRequestBuilder((ElasticsearchClient) this.client).setIndices(this.context.indexes).setTypes(this.context.types).setSize(0).setQuery(ExtractorUtils.wrapInTimeRangeQuery(this.context.query, this.context.timeField, this.context.start, this.context.end));
        List<AggregationBuilder> aggregatorFactories = this.context.aggs.getAggregatorFactories();
        query.getClass();
        aggregatorFactories.forEach(query::addAggregation);
        List<PipelineAggregationBuilder> pipelineAggregatorFactories = this.context.aggs.getPipelineAggregatorFactories();
        query.getClass();
        pipelineAggregatorFactories.forEach(query::addAggregation);
        return query;
    }

    private List<? extends Histogram.Bucket> getHistogramBuckets(@Nullable Aggregations aggregations) {
        if (aggregations == null) {
            return Collections.emptyList();
        }
        List<Aggregation> asList = aggregations.asList();
        if (asList.isEmpty()) {
            return Collections.emptyList();
        }
        if (asList.size() > 1) {
            throw new IllegalArgumentException("Multiple top level aggregations not supported; found: " + asList.stream().map((v0) -> {
                return v0.getName();
            }).collect(Collectors.toList()));
        }
        Aggregation aggregation = asList.get(0);
        if (aggregation instanceof Histogram) {
            return ((Histogram) aggregation).getBuckets();
        }
        throw new IllegalArgumentException("Top level aggregation should be [histogram]");
    }

    private InputStream processNextBatch() throws IOException {
        if (this.histogramBuckets.isEmpty()) {
            this.hasNext = false;
            return null;
        }
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        AggregationToJsonProcessor aggregationToJsonProcessor = new AggregationToJsonProcessor(this.context.timeField, this.context.includeDocCount, byteArrayOutputStream);
        Throwable th = null;
        while (!this.histogramBuckets.isEmpty() && aggregationToJsonProcessor.getKeyValueCount() < BATCH_KEY_VALUE_PAIRS) {
            try {
                aggregationToJsonProcessor.process(this.histogramBuckets.removeFirst());
            } finally {
                if (aggregationToJsonProcessor != null) {
                    if (0 != 0) {
                        try {
                            aggregationToJsonProcessor.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        aggregationToJsonProcessor.close();
                    }
                }
            }
        }
        if (this.histogramBuckets.isEmpty()) {
            this.hasNext = false;
        }
        return new ByteArrayInputStream(byteArrayOutputStream.toByteArray());
    }
}
