package org.graylog.events.processor;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.google.auto.value.AutoValue;
import com.google.common.base.Preconditions;
import com.google.inject.assistedinject.Assisted;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import javax.inject.Inject;
import org.graylog.events.configuration.EventsConfigurationProvider;
import org.graylog.events.processor.AutoValue_EventProcessorExecutionJob_Config;
import org.graylog.events.processor.AutoValue_EventProcessorExecutionJob_Data;
import org.graylog.scheduler.Job;
import org.graylog.scheduler.JobDefinitionConfig;
import org.graylog.scheduler.JobDefinitionDto;
import org.graylog.scheduler.JobExecutionContext;
import org.graylog.scheduler.JobExecutionException;
import org.graylog.scheduler.JobScheduleStrategies;
import org.graylog.scheduler.JobTriggerData;
import org.graylog.scheduler.JobTriggerUpdate;
import org.graylog.scheduler.clock.JobSchedulerClock;
import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/graylog/events/processor/EventProcessorExecutionJob.class */
public class EventProcessorExecutionJob implements Job {
    private static final Logger LOG = LoggerFactory.getLogger(EventProcessorExecutionJob.class);
    public static final String TYPE_NAME = "event-processor-execution-v1";
    private static final long RETRY_INTERVAL = 5000;
    private final JobScheduleStrategies scheduleStrategies;
    private final JobSchedulerClock clock;
    private final EventProcessorEngine eventProcessorEngine;
    private final Config config;
    private final EventsConfigurationProvider configurationProvider;

    @AutoValue
    @JsonTypeName(EventProcessorExecutionJob.TYPE_NAME)
    @JsonDeserialize(builder = Builder.class)
    /* loaded from: input_file:org/graylog/events/processor/EventProcessorExecutionJob$Config.class */
    public static abstract class Config implements JobDefinitionConfig {
        public static final String FIELD_EVENT_DEFINITION_ID = "event_definition_id";
        private static final String FIELD_PARAMETERS = "parameters";
        private static final String FIELD_PROCESSING_WINDOW_SIZE = "processing_window_size";
        private static final String FIELD_PROCESSING_HOP_SIZE = "processing_hop_size";

        @AutoValue.Builder
        /* loaded from: input_file:org/graylog/events/processor/EventProcessorExecutionJob$Config$Builder.class */
        public static abstract class Builder implements JobDefinitionConfig.Builder<Builder> {
            @JsonCreator
            public static Builder create() {
                return new AutoValue_EventProcessorExecutionJob_Config.Builder().type(EventProcessorExecutionJob.TYPE_NAME);
            }

            @JsonProperty("event_definition_id")
            public abstract Builder eventDefinitionId(String str);

            @JsonProperty("parameters")
            public abstract Builder parameters(EventProcessorParametersWithTimerange eventProcessorParametersWithTimerange);

            @JsonProperty(Config.FIELD_PROCESSING_WINDOW_SIZE)
            public abstract Builder processingWindowSize(long j);

            @JsonProperty(Config.FIELD_PROCESSING_HOP_SIZE)
            public abstract Builder processingHopSize(long j);

            abstract Config autoBuild();

            public Config build() {
                type(EventProcessorExecutionJob.TYPE_NAME);
                return autoBuild();
            }
        }

        @JsonProperty("event_definition_id")
        public abstract String eventDefinitionId();

        @JsonProperty("parameters")
        public abstract EventProcessorParametersWithTimerange parameters();

        @JsonProperty(FIELD_PROCESSING_WINDOW_SIZE)
        public abstract long processingWindowSize();

        @JsonProperty(FIELD_PROCESSING_HOP_SIZE)
        public abstract long processingHopSize();

        public static Builder builder() {
            return Builder.create();
        }

        public abstract Builder toBuilder();

        public boolean hasEqualSchedule(Config config) {
            return processingWindowSize() == config.processingWindowSize() && processingHopSize() == config.processingHopSize();
        }
    }

    @AutoValue
    @JsonTypeName(EventProcessorExecutionJob.TYPE_NAME)
    @JsonDeserialize(builder = Builder.class)
    /* loaded from: input_file:org/graylog/events/processor/EventProcessorExecutionJob$Data.class */
    public static abstract class Data implements JobTriggerData {
        private static final String FIELD_TIMERANGE_FROM = "timerange_from";
        private static final String FIELD_TIMERANGE_TO = "timerange_to";

        @AutoValue.Builder
        /* loaded from: input_file:org/graylog/events/processor/EventProcessorExecutionJob$Data$Builder.class */
        public static abstract class Builder implements JobTriggerData.Builder<Builder> {
            @JsonCreator
            public static Builder create() {
                return new AutoValue_EventProcessorExecutionJob_Data.Builder().type(EventProcessorExecutionJob.TYPE_NAME);
            }

            @JsonProperty(Data.FIELD_TIMERANGE_FROM)
            public abstract Builder timerangeFrom(DateTime dateTime);

            @JsonProperty(Data.FIELD_TIMERANGE_TO)
            public abstract Builder timerangeTo(DateTime dateTime);

            abstract Data autoBuild();

            public Data build() {
                type(EventProcessorExecutionJob.TYPE_NAME);
                return autoBuild();
            }
        }

        @JsonProperty(FIELD_TIMERANGE_FROM)
        public abstract DateTime timerangeFrom();

        @JsonProperty(FIELD_TIMERANGE_TO)
        public abstract DateTime timerangeTo();

        public static Data create(DateTime dateTime, DateTime dateTime2) {
            Objects.requireNonNull(dateTime, "from cannot be null");
            Objects.requireNonNull(dateTime2, "to cannot be null");
            Preconditions.checkArgument(dateTime.isBefore(dateTime2), "from must be before to");
            return builder().timerangeFrom(dateTime).timerangeTo(dateTime2).build();
        }

        public static Builder builder() {
            return Builder.create();
        }

        public abstract Builder toBuilder();
    }

    /* loaded from: input_file:org/graylog/events/processor/EventProcessorExecutionJob$Factory.class */
    public interface Factory extends Job.Factory<EventProcessorExecutionJob> {
        /* JADX WARN: Can't rename method to resolve collision */
        @Override // org.graylog.scheduler.Job.Factory
        EventProcessorExecutionJob create(JobDefinitionDto jobDefinitionDto);
    }

    @Inject
    public EventProcessorExecutionJob(JobScheduleStrategies jobScheduleStrategies, JobSchedulerClock jobSchedulerClock, EventProcessorEngine eventProcessorEngine, EventsConfigurationProvider eventsConfigurationProvider, @Assisted JobDefinitionDto jobDefinitionDto) {
        this.scheduleStrategies = jobScheduleStrategies;
        this.clock = jobSchedulerClock;
        this.eventProcessorEngine = eventProcessorEngine;
        this.configurationProvider = eventsConfigurationProvider;
        this.config = (Config) jobDefinitionDto.config();
    }

    @Override // org.graylog.scheduler.Job
    public JobTriggerUpdate execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
        EventProcessorParametersWithTimerange parameters;
        Optional<U> map = jobExecutionContext.trigger().data().map(jobTriggerData -> {
            return (Data) jobTriggerData;
        });
        if (map.isPresent()) {
            LOG.trace("Using timerange from job trigger data: from={} to={} (trigger={})", new Object[]{((Data) map.get()).timerangeFrom(), ((Data) map.get()).timerangeTo(), jobExecutionContext.trigger().id()});
            parameters = this.config.parameters().withTimerange(((Data) map.get()).timerangeFrom(), ((Data) map.get()).timerangeTo());
        } else {
            parameters = this.config.parameters();
        }
        DateTime from = parameters.timerange().getFrom();
        DateTime to = parameters.timerange().getTo();
        if (!to.isAfter(from)) {
            throw new JobExecutionException("Invalid time range - \"to\" timestamp <" + to.toString() + "> is not after \"from\" timestamp <" + from.toString() + ">", jobExecutionContext.trigger(), JobTriggerUpdate.withError(jobExecutionContext.trigger()));
        }
        DateTime nowUTC = this.clock.nowUTC();
        if (nowUTC.isBefore(to)) {
            LOG.error("The end of the timerange to process is in the future, re-scheduling job trigger <{}> to run at <{}>", jobExecutionContext.trigger().id(), to);
            return JobTriggerUpdate.withNextTime(to);
        }
        try {
            this.eventProcessorEngine.execute(this.config.eventDefinitionId(), parameters);
            DateTime plus = to.plus(this.config.processingHopSize());
            DateTime minus = plus.minus(this.config.processingWindowSize());
            long eventCatchupWindow = this.configurationProvider.m18get().eventCatchupWindow();
            if (eventCatchupWindow > 0 && eventCatchupWindow > this.config.processingWindowSize() && to.plus(eventCatchupWindow).isBefore(nowUTC) && this.config.processingHopSize() <= this.config.processingWindowSize()) {
                long processingWindowSize = eventCatchupWindow / this.config.processingWindowSize();
                plus = to.plus(this.config.processingWindowSize() * processingWindowSize);
                LOG.info("Event processor <{}> is catching up on old data. Combining {} search windows with catchUpWindowSize={}ms: from={} to={}", new Object[]{this.config.eventDefinitionId(), Long.valueOf(processingWindowSize), Long.valueOf(eventCatchupWindow), minus, plus});
            }
            LOG.trace("Set new timerange of eventproc <{}> in job trigger data: from={} to={} (hopSize={}ms windowSize={}ms)", new Object[]{this.config.eventDefinitionId(), minus, plus, Long.valueOf(this.config.processingHopSize()), Long.valueOf(this.config.processingWindowSize())});
            Data build = ((Data.Builder) map.map((v0) -> {
                return v0.toBuilder();
            }).orElse(Data.builder())).timerangeFrom(minus).timerangeTo(plus).build();
            Optional<DateTime> nextTime = this.scheduleStrategies.nextTime(jobExecutionContext.trigger());
            if (!nextTime.isPresent()) {
                LOG.trace("No nextTime for trigger <{}>", jobExecutionContext.trigger().id());
                return JobTriggerUpdate.withoutNextTime();
            }
            if (plus.isBefore(nowUTC)) {
                LOG.trace("Set nextTime to <{}> to catch up faster - calculated nextTime was <{}>", nowUTC, nextTime.get());
                return JobTriggerUpdate.withNextTimeAndData(nowUTC, build);
            }
            if (plus.isBefore(nextTime.get())) {
                LOG.trace("Set nextTime to <{}> because it's closer to the timerange time - calculated nextTime was <{}>", plus, nextTime.get());
                return JobTriggerUpdate.withNextTimeAndData(plus, build);
            }
            LOG.trace("Set nextTime to <{}>", nextTime.get());
            return JobTriggerUpdate.withNextTimeAndData(nextTime.get(), build);
        } catch (EventProcessorPreconditionException e) {
            if (e.getEventDefinition().isPresent()) {
                LOG.debug("Event processor <{}/{}> couldn't be executed because of a failed precondition (retry in {} ms)", new Object[]{e.getEventDefinition().get().title(), e.getEventDefinitionId(), Long.valueOf(RETRY_INTERVAL)});
            } else {
                LOG.debug("Event processor <{}> couldn't be executed because of a failed precondition (retry in {} ms)", e.getEventDefinitionId(), Long.valueOf(RETRY_INTERVAL));
            }
            return jobExecutionContext.jobTriggerUpdates().retryIn(RETRY_INTERVAL, TimeUnit.MILLISECONDS);
        } catch (EventProcessorException e2) {
            if (e2.getEventDefinition().isPresent()) {
                LOG.error("Event processor <{}/{}> failed to execute: {} (retry in {} ms)", new Object[]{e2.getEventDefinition().get().config().type(), e2.getEventDefinitionId(), e2.getMessage(), Long.valueOf(RETRY_INTERVAL), e2});
            } else {
                LOG.error("Event processor <{}> failed to execute: {} (retry in {} ms)", new Object[]{e2.getEventDefinitionId(), e2.getMessage(), Long.valueOf(RETRY_INTERVAL), e2});
            }
            if (!e2.isPermanent()) {
                return jobExecutionContext.jobTriggerUpdates().retryIn(RETRY_INTERVAL, TimeUnit.MILLISECONDS);
            }
            LOG.error("Caught a permanent error, trigger <{}> will go into ERROR state - it will not be executed anymore and needs manual intervention! (event-definition-id: {} job-definition={}/{})", new Object[]{jobExecutionContext.trigger().id(), e2.getEventDefinitionId(), jobExecutionContext.definition().id(), jobExecutionContext.definition().title()});
            return JobTriggerUpdate.withError(jobExecutionContext.trigger());
        } catch (Exception e3) {
            LOG.error("Event processor <{}> failed to execute: parameters={} (retry in {} ms)", new Object[]{this.config.eventDefinitionId(), parameters, Long.valueOf(RETRY_INTERVAL), e3});
            return jobExecutionContext.jobTriggerUpdates().retryIn(RETRY_INTERVAL, TimeUnit.MILLISECONDS);
        }
    }
}
