package org.graylog.events.processor.aggregation;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.collect.UnmodifiableIterator;
import com.google.common.primitives.Ints;
import com.google.inject.assistedinject.Assisted;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.inject.Inject;
import org.apache.logging.log4j.util.Strings;
import org.graylog.events.conditions.BooleanNumberConditionsVisitor;
import org.graylog.events.event.Event;
import org.graylog.events.event.EventFactory;
import org.graylog.events.event.EventOriginContext;
import org.graylog.events.event.EventWithContext;
import org.graylog.events.processor.DBEventProcessorStateService;
import org.graylog.events.processor.EventConsumer;
import org.graylog.events.processor.EventDefinition;
import org.graylog.events.processor.EventProcessor;
import org.graylog.events.processor.EventProcessorDependencyCheck;
import org.graylog.events.processor.EventProcessorException;
import org.graylog.events.processor.EventProcessorParameters;
import org.graylog.events.processor.EventProcessorPreconditionException;
import org.graylog.events.processor.aggregation.AggregationSearch;
import org.graylog.events.search.MoreSearch;
import org.graylog.plugins.views.search.errors.ParameterExpansionError;
import org.graylog.plugins.views.search.errors.SearchException;
import org.graylog2.configuration.HttpConfiguration;
import org.graylog2.indexer.messages.Messages;
import org.graylog2.indexer.results.ResultMessage;
import org.graylog2.plugin.Message;
import org.graylog2.plugin.MessageSummary;
import org.graylog2.plugin.indexer.searches.timeranges.AbsoluteRange;
import org.graylog2.streams.StreamImpl;
import org.graylog2.streams.StreamService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/graylog/events/processor/aggregation/AggregationEventProcessor.class */
public class AggregationEventProcessor implements EventProcessor {
    private static final Logger LOG = LoggerFactory.getLogger(AggregationEventProcessor.class);
    private final EventDefinition eventDefinition;
    private final AggregationEventProcessorConfig config;
    private final AggregationSearch.Factory aggregationSearchFactory;
    private final EventProcessorDependencyCheck dependencyCheck;
    private final DBEventProcessorStateService stateService;
    private final MoreSearch moreSearch;
    private final StreamService streamService;
    private final Messages messages;

    /* loaded from: input_file:org/graylog/events/processor/aggregation/AggregationEventProcessor$Factory.class */
    public interface Factory extends EventProcessor.Factory<AggregationEventProcessor> {
        /* JADX WARN: Can't rename method to resolve collision */
        @Override // org.graylog.events.processor.EventProcessor.Factory
        AggregationEventProcessor create(EventDefinition eventDefinition);
    }

    @Inject
    public AggregationEventProcessor(@Assisted EventDefinition eventDefinition, AggregationSearch.Factory factory, EventProcessorDependencyCheck eventProcessorDependencyCheck, DBEventProcessorStateService dBEventProcessorStateService, MoreSearch moreSearch, StreamService streamService, Messages messages) {
        this.eventDefinition = eventDefinition;
        this.config = (AggregationEventProcessorConfig) eventDefinition.config();
        this.aggregationSearchFactory = factory;
        this.dependencyCheck = eventProcessorDependencyCheck;
        this.stateService = dBEventProcessorStateService;
        this.moreSearch = moreSearch;
        this.streamService = streamService;
        this.messages = messages;
    }

    @Override // org.graylog.events.processor.EventProcessor
    public void createEvents(EventFactory eventFactory, EventProcessorParameters eventProcessorParameters, EventConsumer<List<EventWithContext>> eventConsumer) throws EventProcessorException {
        AggregationEventProcessorParameters aggregationEventProcessorParameters = (AggregationEventProcessorParameters) eventProcessorParameters;
        if (!this.dependencyCheck.hasMessagesIndexedUpTo(aggregationEventProcessorParameters.timerange().getTo())) {
            throw new EventProcessorPreconditionException(String.format(Locale.ROOT, "Couldn't run aggregation <%s/%s> for timerange <%s to %s> because required messages haven't been indexed, yet.", this.eventDefinition.title(), this.eventDefinition.id(), aggregationEventProcessorParameters.timerange().getFrom(), aggregationEventProcessorParameters.timerange().getTo()), this.eventDefinition);
        }
        LOG.debug("Creating events for config={} parameters={}", this.config, aggregationEventProcessorParameters);
        try {
            if (this.config.series().isEmpty()) {
                filterSearch(eventFactory, aggregationEventProcessorParameters, eventConsumer);
            } else {
                aggregatedSearch(eventFactory, aggregationEventProcessorParameters, eventConsumer);
            }
        } catch (SearchException e) {
            if (e.error() instanceof ParameterExpansionError) {
                String format = String.format(Locale.ROOT, "Couldn't run aggregation <%s/%s>  because parameters failed to expand: %s", this.eventDefinition.title(), this.eventDefinition.id(), e.error().description());
                LOG.error(format);
                throw new EventProcessorPreconditionException(format, this.eventDefinition, e);
            }
        }
        this.stateService.setState(this.eventDefinition.id(), aggregationEventProcessorParameters.timerange().getFrom(), aggregationEventProcessorParameters.timerange().getTo());
    }

    @Override // org.graylog.events.processor.EventProcessor
    public void sourceMessagesForEvent(Event event, Consumer<List<MessageSummary>> consumer, long j) throws EventProcessorException {
        if (!this.config.series().isEmpty()) {
            AtomicLong atomicLong = new AtomicLong(0L);
            this.moreSearch.scrollQuery(this.config.query(), this.config.streams(), this.config.queryParameters(), AbsoluteRange.create(event.getTimerangeStart(), event.getTimerangeEnd()), Math.min(500, Ints.saturatedCast(j)), (list, atomicBoolean) -> {
                ArrayList newArrayList = Lists.newArrayList();
                Iterator it = list.iterator();
                while (true) {
                    if (!it.hasNext()) {
                        break;
                    }
                    ResultMessage resultMessage = (ResultMessage) it.next();
                    if (atomicLong.incrementAndGet() > j) {
                        atomicBoolean.set(false);
                        break;
                    } else {
                        newArrayList.add(new MessageSummary(resultMessage.getIndex(), resultMessage.getMessage()));
                    }
                }
                consumer.accept(newArrayList);
            });
        } else {
            if (j <= 0) {
                return;
            }
            EventOriginContext.ESEventOriginContext orElseThrow = EventOriginContext.parseESContext(event.getOriginContext()).orElseThrow(() -> {
                return new EventProcessorException("Failed to parse origin context", false, this.eventDefinition);
            });
            try {
                ResultMessage resultMessage = this.messages.get(orElseThrow.messageId(), orElseThrow.indexName());
                consumer.accept(Lists.newArrayList(new MessageSummary[]{new MessageSummary(resultMessage.getIndex(), resultMessage.getMessage())}));
            } catch (IOException e) {
                throw new EventProcessorException("Failed to query origin context message", false, this.eventDefinition, (Throwable) e);
            }
        }
    }

    private Set<String> getStreams(AggregationEventProcessorParameters aggregationEventProcessorParameters) {
        return aggregationEventProcessorParameters.streams().isEmpty() ? this.config.streams() : aggregationEventProcessorParameters.streams();
    }

    private void filterSearch(EventFactory eventFactory, AggregationEventProcessorParameters aggregationEventProcessorParameters, EventConsumer<List<EventWithContext>> eventConsumer) throws EventProcessorException {
        this.moreSearch.scrollQuery(this.config.query(), getStreams(aggregationEventProcessorParameters), this.config.queryParameters(), aggregationEventProcessorParameters.timerange(), aggregationEventProcessorParameters.batchSize(), (list, atomicBoolean) -> {
            ImmutableList.Builder builder = ImmutableList.builder();
            Iterator it = list.iterator();
            while (it.hasNext()) {
                ResultMessage resultMessage = (ResultMessage) it.next();
                Message message = resultMessage.getMessage();
                Event createEvent = eventFactory.createEvent(this.eventDefinition, message.getTimestamp(), this.eventDefinition.title());
                createEvent.setOriginContext(EventOriginContext.elasticsearchMessage(resultMessage.getIndex(), message.getId()));
                Stream<String> filter = message.getStreamIds().stream().filter(str -> {
                    return getStreams(aggregationEventProcessorParameters).contains(str);
                });
                Objects.requireNonNull(createEvent);
                filter.forEach(createEvent::addSourceStream);
                builder.add(EventWithContext.create(createEvent, message));
            }
            eventConsumer.accept(builder.build());
        });
    }

    private void aggregatedSearch(EventFactory eventFactory, AggregationEventProcessorParameters aggregationEventProcessorParameters, EventConsumer<List<EventWithContext>> eventConsumer) throws EventProcessorException {
        AggregationResult doSearch = this.aggregationSearchFactory.create(this.config, aggregationEventProcessorParameters, "event-processor-aggregation-v1-" + this.eventDefinition.id(), this.eventDefinition).doSearch();
        if (doSearch.keyResults().isEmpty()) {
            LOG.debug("Aggregated search returned empty result set.");
        } else {
            LOG.debug("Got {} (total-aggregated-messages={}) results.", Integer.valueOf(doSearch.keyResults().size()), Long.valueOf(doSearch.totalAggregatedMessages()));
            eventConsumer.accept(eventsFromAggregationResult(eventFactory, aggregationEventProcessorParameters, doSearch));
        }
    }

    private boolean satisfiesConditions(AggregationKeyResult aggregationKeyResult) {
        ImmutableMap immutableMap = (ImmutableMap) aggregationKeyResult.seriesValues().stream().map(aggregationSeriesValue -> {
            return Maps.immutableEntry(aggregationSeriesValue.series().id(), Double.valueOf(aggregationSeriesValue.value()));
        }).collect(ImmutableMap.toImmutableMap((v0) -> {
            return v0.getKey();
        }, (v0) -> {
            return v0.getValue();
        }));
        if (this.config.conditions().isPresent()) {
            return ((Boolean) this.config.conditions().get().expression().map(expression -> {
                return (Boolean) expression.accept(new BooleanNumberConditionsVisitor(immutableMap));
            }).orElse(true)).booleanValue();
        }
        return true;
    }

    @VisibleForTesting
    ImmutableList<EventWithContext> eventsFromAggregationResult(EventFactory eventFactory, AggregationEventProcessorParameters aggregationEventProcessorParameters, AggregationResult aggregationResult) {
        ImmutableList.Builder builder = ImmutableList.builder();
        Set<String> streams = getStreams(aggregationEventProcessorParameters);
        Set<String> sourceStreams = (streams.isEmpty() && aggregationResult.sourceStreams().isEmpty()) ? (Set) this.streamService.loadAll().stream().map((v0) -> {
            return v0.getId();
        }).filter(str -> {
            return !StreamImpl.DEFAULT_EVENT_STREAM_IDS.contains(str);
        }).collect(Collectors.toSet()) : streams.isEmpty() ? aggregationResult.sourceStreams() : aggregationResult.sourceStreams().isEmpty() ? streams : Sets.intersection(streams, aggregationResult.sourceStreams());
        UnmodifiableIterator it = aggregationResult.keyResults().iterator();
        while (it.hasNext()) {
            AggregationKeyResult aggregationKeyResult = (AggregationKeyResult) it.next();
            if (satisfiesConditions(aggregationKeyResult)) {
                String join = Strings.join(aggregationKeyResult.key(), '|');
                String createEventMessageString = createEventMessageString(join, aggregationKeyResult);
                Event createEvent = eventFactory.createEvent(this.eventDefinition, aggregationResult.effectiveTimerange().to(), createEventMessageString);
                createEvent.setTimerangeStart(aggregationEventProcessorParameters.timerange().getFrom());
                createEvent.setTimerangeEnd(aggregationEventProcessorParameters.timerange().getTo());
                Objects.requireNonNull(createEvent);
                sourceStreams.forEach(createEvent::addSourceStream);
                HashMap hashMap = new HashMap();
                for (int i = 0; i < this.config.groupBy().size(); i++) {
                    hashMap.put(this.config.groupBy().get(i), aggregationKeyResult.key().get(i));
                }
                UnmodifiableIterator it2 = aggregationKeyResult.seriesValues().iterator();
                while (it2.hasNext()) {
                    AggregationSeriesValue aggregationSeriesValue = (AggregationSeriesValue) it2.next();
                    String lowerCase = aggregationSeriesValue.series().function().toString().toLowerCase(Locale.ROOT);
                    Optional<String> field = aggregationSeriesValue.series().field();
                    hashMap.put(field.isPresent() ? String.format(Locale.ROOT, "aggregation_value_%s_%s", lowerCase, field.get()) : String.format(Locale.ROOT, "aggregation_value_%s", lowerCase), Double.valueOf(aggregationSeriesValue.value()));
                }
                hashMap.put("aggregation_key", join);
                Message message = new Message(createEventMessageString, HttpConfiguration.PATH_WEB, aggregationResult.effectiveTimerange().to());
                message.addFields(hashMap);
                LOG.debug("Creating event {}/{} - {} {} ({})", new Object[]{this.eventDefinition.title(), this.eventDefinition.id(), aggregationKeyResult.key(), seriesString(aggregationKeyResult), hashMap});
                builder.add(EventWithContext.create(createEvent, message));
            } else {
                LOG.debug("Skipping result <{}> because the conditions <{}> don't match", aggregationKeyResult, this.config.conditions());
            }
        }
        return builder.build();
    }

    private String createEventMessageString(String str, AggregationKeyResult aggregationKeyResult) {
        StringBuilder append = new StringBuilder(this.eventDefinition.title()).append(": ");
        if (!aggregationKeyResult.key().isEmpty()) {
            append.append(str).append(" - ");
        }
        UnmodifiableIterator it = aggregationKeyResult.seriesValues().iterator();
        while (it.hasNext()) {
            AggregationSeriesValue aggregationSeriesValue = (AggregationSeriesValue) it.next();
            AggregationSeries series = aggregationSeriesValue.series();
            String lowerCase = series.function().toString().toLowerCase(Locale.ROOT);
            append.append(lowerCase).append("(").append(series.field().orElse(HttpConfiguration.PATH_WEB)).append(")");
            append.append("=").append(aggregationSeriesValue.value());
            append.append(" ");
        }
        return append.toString().trim();
    }

    private String seriesString(AggregationKeyResult aggregationKeyResult) {
        return (String) aggregationKeyResult.seriesValues().stream().map(aggregationSeriesValue -> {
            return String.format(Locale.ROOT, "%s(%s)=%s", aggregationSeriesValue.series().function().toString().toLowerCase(Locale.ROOT), aggregationSeriesValue.series().field().orElse(HttpConfiguration.PATH_WEB), Double.valueOf(aggregationSeriesValue.value()));
        }).collect(Collectors.joining(" "));
    }
}
