package org.graylog.events.processor.aggregation;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.UnmodifiableIterator;
import com.google.common.primitives.Ints;
import com.google.inject.assistedinject.Assisted;
import jakarta.inject.Inject;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.graylog.events.conditions.BooleanNumberConditionsVisitor;
import org.graylog.events.event.Event;
import org.graylog.events.event.EventFactory;
import org.graylog.events.event.EventOriginContext;
import org.graylog.events.event.EventReplayInfo;
import org.graylog.events.event.EventWithContext;
import org.graylog.events.notifications.EventNotificationModelData;
import org.graylog.events.processor.DBEventProcessorStateService;
import org.graylog.events.processor.EventConsumer;
import org.graylog.events.processor.EventDefinition;
import org.graylog.events.processor.EventProcessor;
import org.graylog.events.processor.EventProcessorDependencyCheck;
import org.graylog.events.processor.EventProcessorException;
import org.graylog.events.processor.EventProcessorParameters;
import org.graylog.events.processor.EventProcessorPreconditionException;
import org.graylog.events.processor.EventStreamService;
import org.graylog.events.processor.aggregation.AggregationSearch;
import org.graylog.events.search.MoreSearch;
import org.graylog.plugins.views.search.elasticsearch.ElasticsearchQueryString;
import org.graylog.plugins.views.search.errors.ParameterExpansionError;
import org.graylog.plugins.views.search.errors.SearchException;
import org.graylog.plugins.views.search.rest.PermittedStreams;
import org.graylog.plugins.views.search.searchtypes.pivot.HasField;
import org.graylog.plugins.views.search.searchtypes.pivot.SeriesSpec;
import org.graylog.plugins.views.search.searchtypes.pivot.series.HasOptionalField;
import org.graylog2.indexer.ElasticsearchException;
import org.graylog2.indexer.messages.Messages;
import org.graylog2.indexer.results.ResultMessage;
import org.graylog2.notifications.Notification;
import org.graylog2.notifications.NotificationService;
import org.graylog2.plugin.Message;
import org.graylog2.plugin.MessageFactory;
import org.graylog2.plugin.MessageSummary;
import org.graylog2.plugin.indexer.searches.timeranges.AbsoluteRange;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/graylog/events/processor/aggregation/AggregationEventProcessor.class */
public class AggregationEventProcessor implements EventProcessor {
    private static final Logger LOG = LoggerFactory.getLogger(AggregationEventProcessor.class);
    private final EventDefinition eventDefinition;
    private final AggregationEventProcessorConfig config;
    private final AggregationSearch.Factory aggregationSearchFactory;
    private final EventProcessorDependencyCheck dependencyCheck;
    private final DBEventProcessorStateService stateService;
    private final MoreSearch moreSearch;
    private final EventStreamService eventStreamService;
    private final Messages messages;
    private final NotificationService notificationService;
    private final PermittedStreams permittedStreams;
    private final Set<EventQuerySearchTypeSupplier> eventQueryModifiers;
    private final MessageFactory messageFactory;

    /* loaded from: input_file:org/graylog/events/processor/aggregation/AggregationEventProcessor$EventLimitReachedException.class */
    private static class EventLimitReachedException extends RuntimeException {
        private EventLimitReachedException() {
        }
    }

    /* loaded from: input_file:org/graylog/events/processor/aggregation/AggregationEventProcessor$Factory.class */
    public interface Factory extends EventProcessor.Factory<AggregationEventProcessor> {
        /* JADX WARN: Can't rename method to resolve collision */
        @Override // org.graylog.events.processor.EventProcessor.Factory
        AggregationEventProcessor create(EventDefinition eventDefinition);
    }

    @Inject
    public AggregationEventProcessor(@Assisted EventDefinition eventDefinition, AggregationSearch.Factory factory, EventProcessorDependencyCheck eventProcessorDependencyCheck, DBEventProcessorStateService dBEventProcessorStateService, MoreSearch moreSearch, EventStreamService eventStreamService, Messages messages, NotificationService notificationService, PermittedStreams permittedStreams, Set<EventQuerySearchTypeSupplier> set, MessageFactory messageFactory) {
        this.eventDefinition = eventDefinition;
        this.config = (AggregationEventProcessorConfig) eventDefinition.config();
        this.aggregationSearchFactory = factory;
        this.dependencyCheck = eventProcessorDependencyCheck;
        this.stateService = dBEventProcessorStateService;
        this.moreSearch = moreSearch;
        this.eventStreamService = eventStreamService;
        this.messages = messages;
        this.notificationService = notificationService;
        this.permittedStreams = permittedStreams;
        this.eventQueryModifiers = set;
        this.messageFactory = messageFactory;
    }

    @Override // org.graylog.events.processor.EventProcessor
    public void createEvents(EventFactory eventFactory, EventProcessorParameters eventProcessorParameters, EventConsumer<List<EventWithContext>> eventConsumer) throws EventProcessorException {
        AggregationEventProcessorParameters aggregationEventProcessorParameters = (AggregationEventProcessorParameters) eventProcessorParameters;
        if (!this.dependencyCheck.hasMessagesIndexedUpTo(aggregationEventProcessorParameters.timerange())) {
            throw new EventProcessorPreconditionException(String.format(Locale.ROOT, "Couldn't run aggregation <%s/%s> for timerange <%s to %s> because required messages haven't been indexed, yet.", this.eventDefinition.title(), this.eventDefinition.id(), aggregationEventProcessorParameters.timerange().getFrom(), aggregationEventProcessorParameters.timerange().getTo()), this.eventDefinition);
        }
        LOG.debug("Creating events for config={} parameters={}", this.config, aggregationEventProcessorParameters);
        try {
            if (this.config.series().isEmpty()) {
                filterSearch(eventFactory, aggregationEventProcessorParameters, eventConsumer);
            } else {
                aggregatedSearch(eventFactory, aggregationEventProcessorParameters, eventConsumer);
            }
        } catch (SearchException e) {
            if (e.error() instanceof ParameterExpansionError) {
                String format = String.format(Locale.ROOT, "Couldn't run aggregation <%s/%s>  because parameters failed to expand: %s", this.eventDefinition.title(), this.eventDefinition.id(), e.error().description());
                LOG.error(format);
                throw new EventProcessorPreconditionException(format, this.eventDefinition, e);
            }
        } catch (ElasticsearchException e2) {
            String format2 = String.format(Locale.ROOT, "Couldn't run aggregation <%s/%s> because of search error: %s", this.eventDefinition.title(), this.eventDefinition.id(), e2.getMessage());
            LOG.error(format2);
            throw new EventProcessorPreconditionException(format2, this.eventDefinition, e2);
        }
        this.stateService.setState(this.eventDefinition.id(), aggregationEventProcessorParameters.timerange().getFrom(), aggregationEventProcessorParameters.timerange().getTo());
    }

    @Override // org.graylog.events.processor.EventProcessor
    public void sourceMessagesForEvent(Event event, Consumer<List<MessageSummary>> consumer, long j) throws EventProcessorException {
        if (!this.config.series().isEmpty()) {
            AtomicLong atomicLong = new AtomicLong(0L);
            MoreSearch.ScrollCallback scrollCallback = (list, atomicBoolean) -> {
                ArrayList newArrayList = Lists.newArrayList();
                Iterator it = list.iterator();
                while (true) {
                    if (!it.hasNext()) {
                        break;
                    }
                    ResultMessage resultMessage = (ResultMessage) it.next();
                    if (atomicLong.incrementAndGet() > j) {
                        atomicBoolean.set(false);
                        break;
                    } else {
                        newArrayList.add(new MessageSummary(resultMessage.getIndex(), resultMessage.getMessage()));
                    }
                }
                consumer.accept(newArrayList);
            };
            ElasticsearchQueryString concatenate = ElasticsearchQueryString.of(this.config.query()).concatenate(groupByQueryString(event));
            LOG.debug("scrollQueryString: {}", concatenate);
            this.moreSearch.scrollQuery(concatenate.queryString(), this.config.streams(), this.config.filters(), this.config.queryParameters(), AbsoluteRange.create(event.getTimerangeStart(), event.getTimerangeEnd()), Math.min(500, Ints.saturatedCast(j)), scrollCallback);
            return;
        }
        if (j <= 0) {
            return;
        }
        EventOriginContext.ESEventOriginContext orElseThrow = EventOriginContext.parseESContext(event.getOriginContext()).orElseThrow(() -> {
            return new EventProcessorException("Failed to parse origin context", false, this.eventDefinition);
        });
        try {
            ResultMessage resultMessage = this.messages.get(orElseThrow.messageId(), orElseThrow.indexName());
            consumer.accept(Lists.newArrayList(new MessageSummary[]{new MessageSummary(resultMessage.getIndex(), resultMessage.getMessage())}));
        } catch (IOException e) {
            throw new EventProcessorException("Failed to query origin context message", false, this.eventDefinition, (Throwable) e);
        }
    }

    private ElasticsearchQueryString groupByQueryString(Event event) {
        ElasticsearchQueryString empty = ElasticsearchQueryString.empty();
        if (!this.config.groupBy().isEmpty()) {
            for (String str : event.getGroupByFields().keySet()) {
                empty = empty.concatenate(ElasticsearchQueryString.of(str + ":\"" + MoreSearch.luceneEscape(event.getGroupByFields().get(str)) + "\""));
            }
        }
        return empty;
    }

    private Set<String> getStreams(AggregationEventProcessorParameters aggregationEventProcessorParameters) {
        return aggregationEventProcessorParameters.streams().isEmpty() ? this.config.streams() : aggregationEventProcessorParameters.streams();
    }

    private void filterSearch(EventFactory eventFactory, AggregationEventProcessorParameters aggregationEventProcessorParameters, EventConsumer<List<EventWithContext>> eventConsumer) throws EventProcessorException {
        Set<String> streams = getStreams(aggregationEventProcessorParameters);
        if (streams.isEmpty()) {
            streams = new HashSet((Collection<? extends String>) this.permittedStreams.loadAllMessageStreams(str -> {
                return true;
            }));
        }
        AtomicInteger atomicInteger = new AtomicInteger(0);
        try {
            this.moreSearch.scrollQuery(this.config.query(), streams, this.config.filters(), this.config.queryParameters(), aggregationEventProcessorParameters.timerange(), aggregationEventProcessorParameters.batchSize(), (list, atomicBoolean) -> {
                ImmutableList.Builder builder = ImmutableList.builder();
                Iterator it = list.iterator();
                while (it.hasNext()) {
                    ResultMessage resultMessage = (ResultMessage) it.next();
                    Message message = resultMessage.getMessage();
                    Event createEvent = eventFactory.createEvent(this.eventDefinition, message.getTimestamp(), this.eventDefinition.title());
                    createEvent.setOriginContext(EventOriginContext.elasticsearchMessage(resultMessage.getIndex(), message.getId()));
                    Set<String> buildEventSourceStreams = this.eventStreamService.buildEventSourceStreams(getStreams(aggregationEventProcessorParameters), ImmutableSet.copyOf(message.getStreamIds()));
                    Objects.requireNonNull(createEvent);
                    buildEventSourceStreams.forEach(createEvent::addSourceStream);
                    createEvent.setReplayInfo(EventReplayInfo.builder().timerangeStart(aggregationEventProcessorParameters.timerange().getFrom()).timerangeEnd(aggregationEventProcessorParameters.timerange().getTo()).query(this.config.query()).streams(createEvent.getSourceStreams()).filters(this.config.filters()).build());
                    builder.add(EventWithContext.create(createEvent, message));
                    if (this.config.eventLimit() != 0 && atomicInteger.incrementAndGet() >= this.config.eventLimit()) {
                        eventConsumer.accept(builder.build());
                        throw new EventLimitReachedException();
                    }
                }
                eventConsumer.accept(builder.build());
            });
        } catch (EventLimitReachedException e) {
            this.notificationService.publishIfFirst(this.notificationService.buildNow().addType(Notification.Type.EVENT_LIMIT_REACHED).addKey(this.eventDefinition.id()).addDetail(EventNotificationModelData.FIELD_EVENT_DEFINITION_TITLE, this.eventDefinition.title()).addDetail("event_definition_id", this.eventDefinition.id()).addDetail("event_limit", Integer.valueOf(this.config.eventLimit())).addSeverity(Notification.Severity.NORMAL));
            LOG.debug("Event limit reached at {} for '{}/{}' event definition.", new Object[]{Integer.valueOf(this.config.eventLimit()), this.eventDefinition.title(), this.eventDefinition.id()});
        }
    }

    private void aggregatedSearch(EventFactory eventFactory, AggregationEventProcessorParameters aggregationEventProcessorParameters, EventConsumer<List<EventWithContext>> eventConsumer) throws EventProcessorException {
        AggregationResult doSearch = this.aggregationSearchFactory.create(this.config, aggregationEventProcessorParameters, new AggregationSearch.User("event-processor-aggregation-v1-" + this.eventDefinition.id(), DateTimeZone.UTC), this.eventDefinition, this.eventQueryModifiers.stream().flatMap(eventQuerySearchTypeSupplier -> {
            return eventQuerySearchTypeSupplier.additionalSearchTypes(this.eventDefinition).stream();
        }).toList()).doSearch();
        if (doSearch.keyResults().isEmpty()) {
            LOG.debug("Aggregated search returned empty result set.");
        } else {
            LOG.debug("Got {} (total-aggregated-messages={}) results.", Integer.valueOf(doSearch.keyResults().size()), Long.valueOf(doSearch.totalAggregatedMessages()));
            eventConsumer.accept(eventsFromAggregationResult(eventFactory, aggregationEventProcessorParameters, doSearch));
        }
    }

    private boolean satisfiesConditions(AggregationKeyResult aggregationKeyResult) {
        ImmutableMap immutableMap = (ImmutableMap) aggregationKeyResult.seriesValues().stream().map(aggregationSeriesValue -> {
            return Maps.immutableEntry(aggregationSeriesValue.series().id(), Double.valueOf(aggregationSeriesValue.value()));
        }).collect(ImmutableMap.toImmutableMap((v0) -> {
            return v0.getKey();
        }, (v0) -> {
            return v0.getValue();
        }));
        if (this.config.conditions().isPresent()) {
            return ((Boolean) this.config.conditions().get().expression().map(expression -> {
                return (Boolean) expression.accept(new BooleanNumberConditionsVisitor(immutableMap));
            }).orElse(true)).booleanValue();
        }
        return true;
    }

    @VisibleForTesting
    ImmutableList<EventWithContext> eventsFromAggregationResult(EventFactory eventFactory, AggregationEventProcessorParameters aggregationEventProcessorParameters, AggregationResult aggregationResult) throws EventProcessorException {
        ImmutableList.Builder builder = ImmutableList.builder();
        Set<String> buildEventSourceStreams = this.eventStreamService.buildEventSourceStreams(getStreams(aggregationEventProcessorParameters), aggregationResult.sourceStreams());
        UnmodifiableIterator it = aggregationResult.keyResults().iterator();
        while (it.hasNext()) {
            AggregationKeyResult aggregationKeyResult = (AggregationKeyResult) it.next();
            if (satisfiesConditions(aggregationKeyResult)) {
                String join = String.join((CharSequence) "|", (Iterable<? extends CharSequence>) aggregationKeyResult.key());
                String createEventMessageString = createEventMessageString(join, aggregationKeyResult);
                Event createEvent = eventFactory.createEvent(this.eventDefinition, aggregationKeyResult.timestamp().orElse(aggregationResult.effectiveTimerange().to()), createEventMessageString);
                createEvent.setTimerangeStart((DateTime) aggregationKeyResult.timestamp().map(dateTime -> {
                    return dateTime.minus(this.config.searchWithinMs());
                }).orElse(aggregationEventProcessorParameters.timerange().getFrom()));
                createEvent.setTimerangeEnd(aggregationKeyResult.timestamp().orElse(aggregationEventProcessorParameters.timerange().getTo()));
                createEvent.setReplayInfo(EventReplayInfo.builder().timerangeStart(createEvent.getTimerangeStart()).timerangeEnd(createEvent.getTimerangeEnd()).query(this.config.query()).streams(buildEventSourceStreams).filters(this.config.filters()).build());
                Objects.requireNonNull(createEvent);
                buildEventSourceStreams.forEach(createEvent::addSourceStream);
                HashMap hashMap = new HashMap();
                for (int i = 0; i < this.config.groupBy().size(); i++) {
                    try {
                        hashMap.put(this.config.groupBy().get(i), aggregationKeyResult.key().get(i));
                    } catch (IndexOutOfBoundsException e) {
                        throw new EventProcessorException("Couldn't create events for: " + this.eventDefinition.title() + " (possibly due to non-existing grouping fields)", false, this.eventDefinition.id(), this.eventDefinition, e);
                    }
                }
                createEvent.setGroupByFields((Map) hashMap.entrySet().stream().collect(Collectors.toMap((v0) -> {
                    return v0.getKey();
                }, entry -> {
                    return entry.getValue().toString();
                })));
                UnmodifiableIterator it2 = aggregationKeyResult.seriesValues().iterator();
                while (it2.hasNext()) {
                    AggregationSeriesValue aggregationSeriesValue = (AggregationSeriesValue) it2.next();
                    String lowerCase = aggregationSeriesValue.series().type().toLowerCase(Locale.ROOT);
                    hashMap.put((String) fieldFromSeries(aggregationSeriesValue.series()).map(str -> {
                        return String.format(Locale.ROOT, "aggregation_value_%s_%s", lowerCase, str);
                    }).orElseGet(() -> {
                        return String.format(Locale.ROOT, "aggregation_value_%s", lowerCase);
                    }), Double.valueOf(aggregationSeriesValue.value()));
                }
                hashMap.put("aggregation_key", join);
                Message createMessage = this.messageFactory.createMessage(createEventMessageString, "", aggregationResult.effectiveTimerange().to());
                createMessage.addFields(hashMap);
                Map<String, Object> map = (Map) this.eventQueryModifiers.stream().flatMap(eventQuerySearchTypeSupplier -> {
                    return eventQuerySearchTypeSupplier.eventModifierData(aggregationResult.additionalResults()).entrySet().stream();
                }).collect(Collectors.toMap((v0) -> {
                    return v0.getKey();
                }, (v0) -> {
                    return v0.getValue();
                }));
                LOG.debug("Creating event {}/{} - {} {} ({})", new Object[]{this.eventDefinition.title(), this.eventDefinition.id(), aggregationKeyResult.key(), seriesString(aggregationKeyResult), hashMap});
                builder.add(EventWithContext.builder().event(createEvent).messageContext(createMessage).eventModifierState(map).build());
            } else {
                LOG.debug("Skipping result <{}> because the conditions <{}> don't match", aggregationKeyResult, this.config.conditions());
            }
        }
        return builder.build();
    }

    private Optional<String> fieldFromSeries(SeriesSpec seriesSpec) {
        return seriesSpec instanceof HasField ? Optional.ofNullable(((HasField) seriesSpec).field()) : seriesSpec instanceof HasOptionalField ? ((HasOptionalField) seriesSpec).field() : Optional.empty();
    }

    private String createEventMessageString(String str, AggregationKeyResult aggregationKeyResult) {
        StringBuilder append = new StringBuilder(this.eventDefinition.title()).append(": ");
        if (!aggregationKeyResult.key().isEmpty()) {
            append.append(str).append(" - ");
        }
        append.append(seriesString(aggregationKeyResult));
        return append.toString().trim();
    }

    private String seriesString(AggregationKeyResult aggregationKeyResult) {
        return (String) aggregationKeyResult.seriesValues().stream().map(this::formatSeriesValue).collect(Collectors.joining(" "));
    }

    private String formatSeriesValue(AggregationSeriesValue aggregationSeriesValue) {
        return String.format(Locale.ROOT, "%s=%s", aggregationSeriesValue.series().literal(), Double.valueOf(aggregationSeriesValue.value()));
    }
}
